From a3adc499b9237022f1f269f7797929edde7588c6 Mon Sep 17 00:00:00 2001 From: urso Date: Wed, 12 Jul 2017 22:09:27 +0200 Subject: [PATCH] Pipeline metrics - report pipeline metrics on: - libbeat.pipeline.... - xpack.monitoring.pipeline... --- libbeat/cmd/test/output.go | 2 +- .../report/elasticsearch/elasticsearch.go | 13 +- libbeat/outputs/console/console.go | 41 ++++-- libbeat/outputs/console/console_test.go | 2 +- libbeat/outputs/elasticsearch/client.go | 46 +++---- .../elasticsearch/client_integration_test.go | 2 +- .../outputs/elasticsearch/elasticsearch.go | 36 +---- libbeat/outputs/fileout/file.go | 24 +++- libbeat/outputs/kafka/client.go | 29 ++-- libbeat/outputs/kafka/kafka.go | 8 +- .../outputs/kafka/kafka_integration_test.go | 2 +- libbeat/outputs/logstash/async.go | 30 ++-- libbeat/outputs/logstash/async_test.go | 2 +- libbeat/outputs/logstash/logstash.go | 33 ++--- .../logstash/logstash_integration_test.go | 2 +- libbeat/outputs/logstash/logstash_test.go | 2 +- libbeat/outputs/logstash/sync.go | 26 ++-- libbeat/outputs/logstash/sync_test.go | 2 +- libbeat/outputs/metrics.go | 102 ++++++++++++++ libbeat/outputs/output.go | 1 - libbeat/outputs/output_reg.go | 9 +- libbeat/outputs/outputs.go | 9 -- libbeat/outputs/redis/client.go | 66 +++++---- libbeat/outputs/redis/redis.go | 31 +---- .../outputs/redis/redis_integration_test.go | 2 +- libbeat/outputs/transport/client.go | 2 +- libbeat/outputs/transport/stats.go | 24 ++-- libbeat/publisher/bc/publisher/pipeline.go | 16 ++- libbeat/publisher/pipeline/batch.go | 19 ++- libbeat/publisher/pipeline/client.go | 13 +- libbeat/publisher/pipeline/controller.go | 17 ++- libbeat/publisher/pipeline/monitoring.go | 128 ++++++++++++++++++ libbeat/publisher/pipeline/output.go | 23 ++-- libbeat/publisher/pipeline/pipeline.go | 27 +++- libbeat/publisher/pipeline/retry.go | 38 +++++- 35 files changed, 575 insertions(+), 254 deletions(-) create mode 100644 libbeat/outputs/metrics.go delete mode 100644 libbeat/outputs/output.go create mode 100644 libbeat/publisher/pipeline/monitoring.go diff --git a/libbeat/cmd/test/output.go b/libbeat/cmd/test/output.go index dbdc48fba84..b39f0d41113 100644 --- a/libbeat/cmd/test/output.go +++ b/libbeat/cmd/test/output.go @@ -28,7 +28,7 @@ func GenTestOutputCmd(name, beatVersion string) *cobra.Command { os.Exit(1) } - output, err := outputs.Load(b.Info, b.Config.Output.Name(), b.Config.Output.Config()) + output, err := outputs.Load(b.Info, nil, b.Config.Output.Name(), b.Config.Output.Config()) if err != nil { fmt.Fprintf(os.Stderr, "Error initializing output: %s\n", err) os.Exit(1) diff --git a/libbeat/monitoring/report/elasticsearch/elasticsearch.go b/libbeat/monitoring/report/elasticsearch/elasticsearch.go index 0bf872f1b7e..94f1939209c 100644 --- a/libbeat/monitoring/report/elasticsearch/elasticsearch.go +++ b/libbeat/monitoring/report/elasticsearch/elasticsearch.go @@ -110,10 +110,15 @@ func makeReporter(beat common.BeatInfo, cfg *common.Config) (report.Reporter, er Events: 20, }), nil } - pipeline, err := pipeline.New(brokerFactory, out, pipeline.Settings{ - WaitClose: 0, - WaitCloseMode: pipeline.NoWaitOnClose, - }) + + monitoring := monitoring.Default.NewRegistry("xpack.monitoring") + + pipeline, err := pipeline.New( + monitoring, + brokerFactory, out, pipeline.Settings{ + WaitClose: 0, + WaitCloseMode: pipeline.NoWaitOnClose, + }) if err != nil { return nil, err } diff --git a/libbeat/outputs/console/console.go b/libbeat/outputs/console/console.go index 7aae0ce7cb9..5a972d4bcb9 100644 --- a/libbeat/outputs/console/console.go +++ b/libbeat/outputs/console/console.go @@ -17,6 +17,7 @@ import ( type console struct { out *os.File + stats *outputs.Stats writer *bufio.Writer codec codec.Codec index string @@ -33,7 +34,11 @@ func init() { outputs.RegisterType("console", makeConsole) } -func makeConsole(beat common.BeatInfo, cfg *common.Config) (outputs.Group, error) { +func makeConsole( + beat common.BeatInfo, + stats *outputs.Stats, + cfg *common.Config, +) (outputs.Group, error) { config := defaultConfig err := cfg.Unpack(&config) if err != nil { @@ -51,7 +56,7 @@ func makeConsole(beat common.BeatInfo, cfg *common.Config) (outputs.Group, error } index := beat.Beat - c, err := newConsole(index, enc) + c, err := newConsole(index, stats, enc) if err != nil { return outputs.Fail(fmt.Errorf("console output initialization failed with: %v", err)) } @@ -67,46 +72,62 @@ func makeConsole(beat common.BeatInfo, cfg *common.Config) (outputs.Group, error return outputs.Success(config.BatchSize, 0, c) } -func newConsole(index string, codec codec.Codec) (*console, error) { - c := &console{out: os.Stdout, codec: codec, index: index} +func newConsole(index string, stats *outputs.Stats, codec codec.Codec) (*console, error) { + c := &console{out: os.Stdout, codec: codec, stats: stats, index: index} c.writer = bufio.NewWriterSize(c.out, 8*1024) return c, nil } func (c *console) Close() error { return nil } func (c *console) Publish(batch publisher.Batch) error { + st := c.stats events := batch.Events() + st.NewBatch(len(events)) + + dropped := 0 for i := range events { - c.publishEvent(&events[i]) + ok := c.publishEvent(&events[i]) + if !ok { + dropped++ + } } c.writer.Flush() batch.ACK() + + st.Dropped(dropped) + st.Acked(len(events) - dropped) + return nil } var nl = []byte("\n") -func (c *console) publishEvent(event *publisher.Event) { +func (c *console) publishEvent(event *publisher.Event) bool { serializedEvent, err := c.codec.Encode(c.index, &event.Content) if err != nil { if !event.Guaranteed() { - return + return false } logp.Critical("Unable to encode event: %v", err) - return + return false } if err := c.writeBuffer(serializedEvent); err != nil { + c.stats.WriteError() logp.Critical("Unable to publish events to console: %v", err) - return + return false } if err := c.writeBuffer(nl); err != nil { + c.stats.WriteError() logp.Critical("Error when appending newline to event: %v", err) - return + return false } + + c.stats.WriteBytes(len(serializedEvent) + 1) + return true } func (c *console) writeBuffer(buf []byte) error { diff --git a/libbeat/outputs/console/console_test.go b/libbeat/outputs/console/console_test.go index 8459ca8ec90..8816d3b1760 100644 --- a/libbeat/outputs/console/console_test.go +++ b/libbeat/outputs/console/console_test.go @@ -104,7 +104,7 @@ func TestConsoleOutput(t *testing.T) { func run(codec codec.Codec, batches ...publisher.Batch) (string, error) { return withStdout(func() { - c, _ := newConsole("test", codec) + c, _ := newConsole("test", nil, codec) for _, b := range batches { c.Publish(b) } diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index 05a2add4c19..63fa42e0261 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -12,7 +12,6 @@ import ( "time" "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/libbeat/monitoring" "github.com/elastic/beats/libbeat/outputs" "github.com/elastic/beats/libbeat/outputs/outil" "github.com/elastic/beats/libbeat/outputs/transport" @@ -41,7 +40,7 @@ type Client struct { compressionLevel int proxyURL *url.URL - stats *ClientStats + stats *outputs.Stats } // ClientSettings contains the settings for a client. @@ -56,14 +55,7 @@ type ClientSettings struct { Pipeline *outil.Selector Timeout time.Duration CompressionLevel int - Stats *ClientStats -} - -type ClientStats struct { - PublishCallCount *monitoring.Int - EventsACKed *monitoring.Int - EventsFailed *monitoring.Int - IO *transport.IOStats + Stats *outputs.Stats } type connectCallback func(client *Client) error @@ -139,9 +131,9 @@ func NewClient( return nil, err } - if st := s.Stats; st != nil && st.IO != nil { - dialer = transport.StatsDialer(dialer, st.IO) - tlsDialer = transport.StatsDialer(tlsDialer, st.IO) + if st := s.Stats; st != nil { + dialer = transport.StatsDialer(dialer, st) + tlsDialer = transport.StatsDialer(tlsDialer, st) } params := s.Parameters @@ -251,8 +243,10 @@ func (client *Client) publishEvents( data []publisher.Event, ) ([]publisher.Event, error) { begin := time.Now() - if st := client.stats; st != nil && st.PublishCallCount != nil { - st.PublishCallCount.Add(1) + st := client.stats + + if st != nil { + st.NewBatch(len(data)) } if len(data) == 0 { @@ -264,8 +258,14 @@ func (client *Client) publishEvents( // encode events into bulk request buffer, dropping failed elements from // events slice + + origCount := len(data) data = bulkEncodePublishRequest(body, client.index, client.pipeline, data) - if len(data) == 0 { + newCount := len(data) + if st != nil && origCount > newCount { + st.Dropped(origCount - newCount) + } + if newCount == 0 { return nil, nil } @@ -290,22 +290,20 @@ func (client *Client) publishEvents( failedEvents = bulkCollectPublishFails(&client.json, data) } + failed := len(failedEvents) if st := client.stats; st != nil { - countOK := int64(len(data) - len(failedEvents)) - st.EventsACKed.Add(countOK) - outputs.AckedEvents.Add(countOK) - if failed := int64(len(failedEvents)); failed > 0 { - st.EventsFailed.Add(failed) - } + acked := len(data) - failed + + st.Acked(acked) + st.Failed(failed) } - if len(failedEvents) > 0 { + if failed > 0 { if sendErr == nil { sendErr = errTempBulkFailure } return failedEvents, sendErr } - return nil, nil } diff --git a/libbeat/outputs/elasticsearch/client_integration_test.go b/libbeat/outputs/elasticsearch/client_integration_test.go index 6db70f754f3..0c3e3be50d8 100644 --- a/libbeat/outputs/elasticsearch/client_integration_test.go +++ b/libbeat/outputs/elasticsearch/client_integration_test.go @@ -249,7 +249,7 @@ func connectTestEs(t *testing.T, cfg interface{}) (outputs.Client, *Client) { t.Fatal(err) } - output, err := makeES(common.BeatInfo{Beat: "libbeat"}, config) + output, err := makeES(common.BeatInfo{Beat: "libbeat"}, nil, config) if err != nil { t.Fatal(err) } diff --git a/libbeat/outputs/elasticsearch/elasticsearch.go b/libbeat/outputs/elasticsearch/elasticsearch.go index 964a65085b9..cabc5fd2d30 100644 --- a/libbeat/outputs/elasticsearch/elasticsearch.go +++ b/libbeat/outputs/elasticsearch/elasticsearch.go @@ -7,10 +7,8 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/libbeat/monitoring" "github.com/elastic/beats/libbeat/outputs" "github.com/elastic/beats/libbeat/outputs/outil" - "github.com/elastic/beats/libbeat/outputs/transport" ) func init() { @@ -40,20 +38,6 @@ type callbacksRegistry struct { // XXX: it would be fantastic to do this without a package global var connectCallbackRegistry callbacksRegistry -// Metrics that can retrieved through the expvar web interface. -var ( - esMetrics = outputs.Metrics.NewRegistry("elasticsearch") - - ackedEvents = monitoring.NewInt(esMetrics, "events.acked") - eventsNotAcked = monitoring.NewInt(esMetrics, "events.not_acked") - publishEventsCallCount = monitoring.NewInt(esMetrics, "publishEvents.call.count") - - statReadBytes = monitoring.NewInt(esMetrics, "read.bytes") - statWriteBytes = monitoring.NewInt(esMetrics, "write.bytes") - statReadErrors = monitoring.NewInt(esMetrics, "read.errors") - statWriteErrors = monitoring.NewInt(esMetrics, "write.errors") -) - // RegisterConnectCallback registers a callback for the elasticsearch output // The callback is called each time the client connects to elasticsearch. func RegisterConnectCallback(callback connectCallback) { @@ -62,7 +46,11 @@ func RegisterConnectCallback(callback connectCallback) { connectCallbackRegistry.callbacks = append(connectCallbackRegistry.callbacks, callback) } -func makeES(beat common.BeatInfo, cfg *common.Config) (outputs.Group, error) { +func makeES( + beat common.BeatInfo, + stats *outputs.Stats, + cfg *common.Config, +) (outputs.Group, error) { if !cfg.HasField("bulk_max_size") { cfg.SetInt("bulk_max_size", -1, defaultBulkSize) } @@ -125,20 +113,6 @@ func makeES(beat common.BeatInfo, cfg *common.Config) (outputs.Group, error) { params = nil } - stats := &ClientStats{ - PublishCallCount: publishEventsCallCount, - EventsACKed: ackedEvents, - EventsFailed: eventsNotAcked, - IO: &transport.IOStats{ - Read: statReadBytes, - Write: statWriteBytes, - ReadErrors: statReadErrors, - WriteErrors: statWriteErrors, - OutputsWrite: outputs.WriteBytes, - OutputsWriteErrors: outputs.WriteErrors, - }, - } - clients := make([]outputs.NetworkClient, len(hosts)) for i, host := range hosts { esURL, err := common.MakeURL(config.Protocol, config.Path, host, 9200) diff --git a/libbeat/outputs/fileout/file.go b/libbeat/outputs/fileout/file.go index beaa5cb6211..b19096a48bb 100644 --- a/libbeat/outputs/fileout/file.go +++ b/libbeat/outputs/fileout/file.go @@ -14,12 +14,17 @@ func init() { type fileOutput struct { beat common.BeatInfo + stats *outputs.Stats rotator logp.FileRotator codec codec.Codec } // New instantiates a new file output instance. -func makeFileout(beat common.BeatInfo, cfg *common.Config) (outputs.Group, error) { +func makeFileout( + beat common.BeatInfo, + stats *outputs.Stats, + cfg *common.Config, +) (outputs.Group, error) { config := defaultConfig if err := cfg.Unpack(&config); err != nil { return outputs.Fail(err) @@ -29,7 +34,7 @@ func makeFileout(beat common.BeatInfo, cfg *common.Config) (outputs.Group, error cfg.SetInt("flush_interval", -1, -1) cfg.SetInt("bulk_max_size", -1, -1) - fo := &fileOutput{beat: beat} + fo := &fileOutput{beat: beat, stats: stats} if err := fo.init(config); err != nil { return outputs.Fail(err) } @@ -87,7 +92,11 @@ func (out *fileOutput) Publish( ) error { defer batch.ACK() + st := out.stats events := batch.Events() + st.NewBatch(len(events)) + + dropped := 0 for i := range events { event := &events[i] @@ -98,19 +107,30 @@ func (out *fileOutput) Publish( } else { logp.Warn("Failed to serialize the event: %v", err) } + + dropped++ continue } err = out.rotator.WriteLine(serializedEvent) if err != nil { + st.WriteError() + if event.Guaranteed() { logp.Critical("Writing event to file failed with: %v", err) } else { logp.Warn("Writing event to file failed with: %v", err) } + + dropped++ continue } + + st.WriteBytes(len(serializedEvent) + 1) } + st.Dropped(dropped) + st.Acked(len(events) - dropped) + return nil } diff --git a/libbeat/outputs/kafka/client.go b/libbeat/outputs/kafka/client.go index 9da9d37c0cd..79161f10986 100644 --- a/libbeat/outputs/kafka/client.go +++ b/libbeat/outputs/kafka/client.go @@ -9,7 +9,6 @@ import ( "github.com/elastic/beats/libbeat/common/fmtstr" "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/libbeat/monitoring" "github.com/elastic/beats/libbeat/outputs" "github.com/elastic/beats/libbeat/outputs/codec" "github.com/elastic/beats/libbeat/outputs/outil" @@ -17,6 +16,7 @@ import ( ) type client struct { + stats *outputs.Stats hosts []string topic outil.Selector key *fmtstr.EventFormatString @@ -30,6 +30,7 @@ type client struct { } type msgRef struct { + client *client count int32 total int failed []publisher.Event @@ -38,15 +39,8 @@ type msgRef struct { err error } -var ( - kafkaMetrics = outputs.Metrics.NewRegistry("kafka") - - ackedEvents = monitoring.NewInt(kafkaMetrics, "events.acked") - eventsNotAcked = monitoring.NewInt(kafkaMetrics, "events.not_acked") - publishEventsCallCount = monitoring.NewInt(kafkaMetrics, "publishEvents.call.count") -) - func newKafkaClient( + stats *outputs.Stats, hosts []string, index string, key *fmtstr.EventFormatString, @@ -55,6 +49,7 @@ func newKafkaClient( cfg *sarama.Config, ) (*client, error) { c := &client{ + stats: stats, hosts: hosts, topic: topic, key: key, @@ -94,12 +89,11 @@ func (c *client) Close() error { } func (c *client) Publish(batch publisher.Batch) error { - publishEventsCallCount.Add(1) - debugf("publish events") - events := batch.Events() + c.stats.NewBatch(len(events)) ref := &msgRef{ + client: c, count: int32(len(events)), total: len(events), failed: nil, @@ -113,6 +107,7 @@ func (c *client) Publish(batch publisher.Batch) error { if err != nil { logp.Err("Dropping event: %v", err) ref.done() + c.stats.Dropped(1) continue } @@ -223,6 +218,7 @@ func (r *msgRef) dec() { } debugf("finished kafka batch") + stats := r.client.stats err := r.err if err != nil { @@ -230,17 +226,14 @@ func (r *msgRef) dec() { success := r.total - failed r.batch.RetryEvents(r.failed) - eventsNotAcked.Add(int64(failed)) + stats.Failed(failed) if success > 0 { - ackedEvents.Add(int64(success)) - outputs.AckedEvents.Add(int64(success)) + stats.Acked(success) } debugf("Kafka publish failed with: %v", err) } else { r.batch.ACK() - - ackedEvents.Add(int64(r.total)) - outputs.AckedEvents.Add(int64(r.total)) + stats.Acked(r.total) } } diff --git a/libbeat/outputs/kafka/kafka.go b/libbeat/outputs/kafka/kafka.go index 02d344b79f6..f867c47445a 100644 --- a/libbeat/outputs/kafka/kafka.go +++ b/libbeat/outputs/kafka/kafka.go @@ -94,7 +94,11 @@ func kafkaMetricsRegistry() gometrics.Registry { return kafkaMetricsRegistryInstance } -func makeKafka(beat common.BeatInfo, cfg *common.Config) (outputs.Group, error) { +func makeKafka( + beat common.BeatInfo, + stats *outputs.Stats, + cfg *common.Config, +) (outputs.Group, error) { debugf("initialize kafka output") config := defaultConfig @@ -127,7 +131,7 @@ func makeKafka(beat common.BeatInfo, cfg *common.Config) (outputs.Group, error) return outputs.Fail(err) } - client, err := newKafkaClient(hosts, beat.Beat, config.Key, topic, codec, libCfg) + client, err := newKafkaClient(stats, hosts, beat.Beat, config.Key, topic, codec, libCfg) if err != nil { return outputs.Fail(err) } diff --git a/libbeat/outputs/kafka/kafka_integration_test.go b/libbeat/outputs/kafka/kafka_integration_test.go index 4a4988afee1..1b6fc10c3d1 100644 --- a/libbeat/outputs/kafka/kafka_integration_test.go +++ b/libbeat/outputs/kafka/kafka_integration_test.go @@ -185,7 +185,7 @@ func TestKafkaPublish(t *testing.T) { } t.Run(name, func(t *testing.T) { - grp, err := makeKafka(common.BeatInfo{Beat: "libbeat"}, cfg) + grp, err := makeKafka(common.BeatInfo{Beat: "libbeat"}, nil, cfg) if err != nil { t.Fatal(err) } diff --git a/libbeat/outputs/logstash/async.go b/libbeat/outputs/logstash/async.go index 8bbc3d06f5a..92b9ea3813a 100644 --- a/libbeat/outputs/logstash/async.go +++ b/libbeat/outputs/logstash/async.go @@ -14,6 +14,7 @@ import ( type asyncClient struct { *transport.Client + stats *outputs.Stats client *v2.AsyncClient win window @@ -21,6 +22,7 @@ type asyncClient struct { } type msgRef struct { + client *asyncClient count atomic.Uint32 batch publisher.Batch slice []publisher.Event @@ -29,9 +31,14 @@ type msgRef struct { batchSize int } -func newAsyncClient(conn *transport.Client, config *Config) (*asyncClient, error) { +func newAsyncClient( + conn *transport.Client, + stats *outputs.Stats, + config *Config, +) (*asyncClient, error) { c := &asyncClient{} c.Client = conn + c.stats = stats c.win.init(defaultStartMaxWindowSize, config.BulkMaxSize) if config.TTL != 0 { @@ -97,9 +104,10 @@ func (c *asyncClient) BatchSize() int { } func (c *asyncClient) Publish(batch publisher.Batch) error { - publishEventsCallCount.Add(1) - + st := c.stats events := batch.Events() + st.NewBatch(len(events)) + if len(events) == 0 { batch.ACK() return nil @@ -111,6 +119,7 @@ func (c *asyncClient) Publish(batch publisher.Batch) error { } ref := &msgRef{ + client: c, count: atomic.MakeUint32(1), batch: batch, slice: events, @@ -177,22 +186,21 @@ func (r *msgRef) callback(seq uint32, err error) { } func (r *msgRef) done(n uint32) { - ackedEvents.Add(int64(n)) - outputs.AckedEvents.Add(int64(n)) + r.client.stats.Acked(int(n)) r.slice = r.slice[n:] r.win.tryGrowWindow(r.batchSize) r.dec() } func (r *msgRef) fail(n uint32, err error) { - ackedEvents.Add(int64(n)) - outputs.AckedEvents.Add(int64(n)) - if r.err == nil { r.err = err } r.slice = r.slice[n:] r.win.shrinkWindow() + + r.client.stats.Acked(int(n)) + r.dec() } @@ -202,14 +210,16 @@ func (r *msgRef) dec() { return } + if L := len(r.slice); L > 0 { + r.client.stats.Failed(L) + } + err := r.err if err == nil { r.batch.ACK() return } - rest := int64(len(r.slice)) r.batch.RetryEvents(r.slice) - eventsNotAcked.Add(rest) logp.Err("Failed to publish events caused by: %v", err) } diff --git a/libbeat/outputs/logstash/async_test.go b/libbeat/outputs/logstash/async_test.go index fc9ea4d55ba..4964b187599 100644 --- a/libbeat/outputs/logstash/async_test.go +++ b/libbeat/outputs/logstash/async_test.go @@ -35,7 +35,7 @@ func makeAsyncTestClient(conn *transport.Client) testClientDriver { config := defaultConfig config.Timeout = 1 * time.Second config.Pipelining = 3 - client, err := newAsyncClient(conn, &config) + client, err := newAsyncClient(conn, nil, &config) if err != nil { panic(err) } diff --git a/libbeat/outputs/logstash/logstash.go b/libbeat/outputs/logstash/logstash.go index 4ca974cff8b..257f060663f 100644 --- a/libbeat/outputs/logstash/logstash.go +++ b/libbeat/outputs/logstash/logstash.go @@ -3,7 +3,6 @@ package logstash import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/libbeat/monitoring" "github.com/elastic/beats/libbeat/outputs" "github.com/elastic/beats/libbeat/outputs/transport" ) @@ -13,26 +12,17 @@ const ( defaultStartMaxWindowSize int = 10 ) -var ( - logstashMetrics = outputs.Metrics.NewRegistry("logstash") - - ackedEvents = monitoring.NewInt(logstashMetrics, "events.acked") - eventsNotAcked = monitoring.NewInt(logstashMetrics, "events.not_acked") - publishEventsCallCount = monitoring.NewInt(logstashMetrics, "publishEvents.call.count") - - statReadBytes = monitoring.NewInt(logstashMetrics, "read.bytes") - statWriteBytes = monitoring.NewInt(logstashMetrics, "write.bytes") - statReadErrors = monitoring.NewInt(logstashMetrics, "read.errors") - statWriteErrors = monitoring.NewInt(logstashMetrics, "write.errors") -) - var debugf = logp.MakeDebug("logstash") func init() { outputs.RegisterType("logstash", makeLogstash) } -func makeLogstash(beat common.BeatInfo, cfg *common.Config) (outputs.Group, error) { +func makeLogstash( + beat common.BeatInfo, + stats *outputs.Stats, + cfg *common.Config, +) (outputs.Group, error) { if !cfg.HasField("index") { cfg.SetString("index", -1, beat.Beat) } @@ -56,14 +46,7 @@ func makeLogstash(beat common.BeatInfo, cfg *common.Config) (outputs.Group, erro Timeout: config.Timeout, Proxy: &config.Proxy, TLS: tls, - Stats: &transport.IOStats{ - Read: statReadBytes, - Write: statWriteBytes, - ReadErrors: statReadErrors, - WriteErrors: statWriteErrors, - OutputsWrite: outputs.WriteBytes, - OutputsWriteErrors: outputs.WriteErrors, - }, + Stats: stats, } clients := make([]outputs.NetworkClient, len(hosts)) @@ -76,9 +59,9 @@ func makeLogstash(beat common.BeatInfo, cfg *common.Config) (outputs.Group, erro } if config.Pipelining > 0 { - client, err = newAsyncClient(conn, config) + client, err = newAsyncClient(conn, stats, config) } else { - client, err = newSyncClient(conn, config) + client, err = newSyncClient(conn, stats, config) } if err != nil { return outputs.Fail(err) diff --git a/libbeat/outputs/logstash/logstash_integration_test.go b/libbeat/outputs/logstash/logstash_integration_test.go index 44bd48daad1..ab1824d1ea7 100644 --- a/libbeat/outputs/logstash/logstash_integration_test.go +++ b/libbeat/outputs/logstash/logstash_integration_test.go @@ -159,7 +159,7 @@ func newTestElasticsearchOutput(t *testing.T, test string) *testOutputer { "template.enabled": false, }) - grp, err := plugin(common.BeatInfo{Beat: "libbeat"}, config) + grp, err := plugin(common.BeatInfo{Beat: "libbeat"}, nil, config) if err != nil { t.Fatalf("init elasticsearch output plugin failed: %v", err) } diff --git a/libbeat/outputs/logstash/logstash_test.go b/libbeat/outputs/logstash/logstash_test.go index 50ca5cd731c..ab914e724f9 100644 --- a/libbeat/outputs/logstash/logstash_test.go +++ b/libbeat/outputs/logstash/logstash_test.go @@ -164,7 +164,7 @@ func newTestLumberjackOutput( } cfg, _ := common.NewConfigFrom(config) - grp, err := outputs.Load(common.BeatInfo{}, "logstash", cfg) + grp, err := outputs.Load(common.BeatInfo{}, nil, "logstash", cfg) if err != nil { t.Fatalf("init logstash output plugin failed: %v", err) } diff --git a/libbeat/outputs/logstash/sync.go b/libbeat/outputs/logstash/sync.go index f3e0f61dc52..9caf46cf495 100644 --- a/libbeat/outputs/logstash/sync.go +++ b/libbeat/outputs/logstash/sync.go @@ -13,15 +13,21 @@ import ( type syncClient struct { *transport.Client client *v2.SyncClient + stats *outputs.Stats win window ttl time.Duration ticker *time.Ticker } -func newSyncClient(conn *transport.Client, config *Config) (*syncClient, error) { +func newSyncClient( + conn *transport.Client, + stats *outputs.Stats, + config *Config, +) (*syncClient, error) { c := &syncClient{} c.Client = conn c.ttl = config.TTL + c.stats = stats c.win.init(defaultStartMaxWindowSize, config.BulkMaxSize) if c.ttl > 0 { c.ticker = time.NewTicker(c.ttl) @@ -71,14 +77,15 @@ func (c *syncClient) reconnect() error { func (c *syncClient) Publish(batch publisher.Batch) error { events := batch.Events() + st := c.stats + + st.NewBatch(len(events)) + if len(events) == 0 { batch.ACK() return nil } - publishEventsCallCount.Add(1) - totalNumberOfEvents := int64(len(events)) - for len(events) > 0 { // check if we need to reconnect if c.ticker != nil { @@ -96,6 +103,7 @@ func (c *syncClient) Publish(batch publisher.Batch) error { n, err := c.publishWindowed(events) events = events[n:] + st.Acked(n) debugf("%v events out of %v events sent to logstash. Continue sending", n, len(events)) @@ -109,20 +117,14 @@ func (c *syncClient) Publish(batch publisher.Batch) error { logp.Err("Failed to publish events caused by: %v", err) - rest := int64(len(events)) - acked := totalNumberOfEvents - rest - - eventsNotAcked.Add(rest) - ackedEvents.Add(acked) - outputs.AckedEvents.Add(acked) + rest := len(events) + st.Failed(rest) return err } } batch.ACK() - ackedEvents.Add(totalNumberOfEvents) - outputs.AckedEvents.Add(totalNumberOfEvents) return nil } diff --git a/libbeat/outputs/logstash/sync_test.go b/libbeat/outputs/logstash/sync_test.go index 93dd3b8d8ef..932aa484ceb 100644 --- a/libbeat/outputs/logstash/sync_test.go +++ b/libbeat/outputs/logstash/sync_test.go @@ -48,7 +48,7 @@ func makeTestClient(conn *transport.Client) testClientDriver { config := defaultConfig config.Timeout = 1 * time.Second config.TTL = 5 * time.Second - client, err := newSyncClient(conn, &config) + client, err := newSyncClient(conn, nil, &config) if err != nil { panic(err) } diff --git a/libbeat/outputs/metrics.go b/libbeat/outputs/metrics.go new file mode 100644 index 00000000000..6bb6540fe4c --- /dev/null +++ b/libbeat/outputs/metrics.go @@ -0,0 +1,102 @@ +package outputs + +import "github.com/elastic/beats/libbeat/monitoring" + +// Stats provides a common type used by outputs to report common events. +// The output events will update a set of unified output metrics in the +// underlying monitoring.Registry. +type Stats struct { + // + // Output event stats + // + batches *monitoring.Uint // total number of batches processed by output + events *monitoring.Uint // total number of events processed by output + + acked *monitoring.Uint // total number of events ACKed by output + failed *monitoring.Uint // total number of events failed in output + active *monitoring.Uint // events sent and waiting for ACK/fail from output + + // + // Output network connection stats + // + writeBytes *monitoring.Uint // total amount of bytes written by output + writeErrors *monitoring.Uint // total number of errors on write + + readBytes *monitoring.Uint // total amount of bytes read + readErrors *monitoring.Uint // total number of errors while waiting for response on output +} + +func MakeStats(reg *monitoring.Registry) Stats { + return Stats{ + batches: monitoring.NewUint(reg, "events.batches"), + events: monitoring.NewUint(reg, "events.total"), + acked: monitoring.NewUint(reg, "events.acked"), + failed: monitoring.NewUint(reg, "events.failed"), + active: monitoring.NewUint(reg, "events.active"), + + writeBytes: monitoring.NewUint(reg, "write.bytes"), + writeErrors: monitoring.NewUint(reg, "write.errors"), + + readBytes: monitoring.NewUint(reg, "read.bytes"), + readErrors: monitoring.NewUint(reg, "read.errors"), + } +} + +func (s *Stats) NewBatch(n int) { + if s != nil { + s.batches.Inc() + s.events.Add(uint64(n)) + s.active.Add(uint64(n)) + } +} + +func (s *Stats) Acked(n int) { + if s != nil { + s.acked.Add(uint64(n)) + s.active.Sub(uint64(n)) + } +} + +func (s *Stats) Failed(n int) { + if s != nil { + s.failed.Add(uint64(n)) + s.active.Sub(uint64(n)) + } +} + +func (s *Stats) Dropped(n int) { + // number of dropped events (e.g. encoding failures) + if s != nil { + s.active.Sub(uint64(n)) + } +} + +func (s *Stats) Cancelled(n int) { + if s != nil { + s.active.Sub(uint64(n)) + } +} + +func (s *Stats) WriteError() { + if s != nil { + s.writeErrors.Inc() + } +} + +func (s *Stats) WriteBytes(n int) { + if s != nil { + s.writeBytes.Add(uint64(n)) + } +} + +func (s *Stats) ReadError() { + if s != nil { + s.readErrors.Inc() + } +} + +func (s *Stats) ReadBytes(n int) { + if s != nil { + s.readBytes.Add(uint64(n)) + } +} diff --git a/libbeat/outputs/output.go b/libbeat/outputs/output.go deleted file mode 100644 index b7ed1c19738..00000000000 --- a/libbeat/outputs/output.go +++ /dev/null @@ -1 +0,0 @@ -package outputs diff --git a/libbeat/outputs/output_reg.go b/libbeat/outputs/output_reg.go index 7449715f1a6..a8f176c3def 100644 --- a/libbeat/outputs/output_reg.go +++ b/libbeat/outputs/output_reg.go @@ -9,7 +9,10 @@ import ( var outputReg = map[string]Factory{} // Factory is used by output plugins to build an output instance -type Factory func(beat common.BeatInfo, cfg *common.Config) (Group, error) +type Factory func( + beat common.BeatInfo, + stats *Stats, + cfg *common.Config) (Group, error) // Group configures and combines multiple clients into load-balanced group of clients // being managed by the publisher pipeline. @@ -33,11 +36,11 @@ func FindFactory(name string) Factory { } // Load creates and configures a output Group using a configuration object.. -func Load(info common.BeatInfo, name string, config *common.Config) (Group, error) { +func Load(info common.BeatInfo, stats *Stats, name string, config *common.Config) (Group, error) { factory := FindFactory(name) if factory == nil { return Group{}, fmt.Errorf("output type %v undefined", name) } - return factory(info, config) + return factory(info, stats, config) } diff --git a/libbeat/outputs/outputs.go b/libbeat/outputs/outputs.go index f653a156241..26d2a5854e3 100644 --- a/libbeat/outputs/outputs.go +++ b/libbeat/outputs/outputs.go @@ -4,18 +4,9 @@ package outputs import ( - "github.com/elastic/beats/libbeat/monitoring" "github.com/elastic/beats/libbeat/publisher" ) -var ( - Metrics = monitoring.Default.NewRegistry("output") - - AckedEvents = monitoring.NewInt(Metrics, "events.acked", monitoring.Report) - WriteBytes = monitoring.NewInt(Metrics, "write.bytes", monitoring.Report) - WriteErrors = monitoring.NewInt(Metrics, "write.errors", monitoring.Report) -) - // Client provides the minimal interface an output must implement to be usable // with the publisher pipeline. type Client interface { diff --git a/libbeat/outputs/redis/client.go b/libbeat/outputs/redis/client.go index 4da46dfb16a..8d34770cc31 100644 --- a/libbeat/outputs/redis/client.go +++ b/libbeat/outputs/redis/client.go @@ -29,6 +29,7 @@ type publishFn func( type client struct { *transport.Client + stats *outputs.Stats index string dataType redisDataType db int @@ -46,9 +47,17 @@ const ( redisChannelType ) -func newClient(tc *transport.Client, timeout time.Duration, pass string, db int, key outil.Selector, dt redisDataType, index string, codec codec.Codec) *client { +func newClient( + tc *transport.Client, + stats *outputs.Stats, + timeout time.Duration, + pass string, + db int, key outil.Selector, dt redisDataType, + index string, codec codec.Codec, +) *client { return &client{ Client: tc, + stats: stats, timeout: timeout, password: pass, index: index, @@ -75,7 +84,7 @@ func (c *client) Connect() error { }() if err = initRedisConn(conn, c.password, c.db); err == nil { - c.publish, err = makePublish(conn, c.key, c.dataType, c.index, c.codec) + c.publish, err = c.makePublish(conn) } return err } @@ -114,30 +123,28 @@ func (c *client) Publish(batch publisher.Batch) error { } events := batch.Events() + c.stats.NewBatch(len(events)) rest, err := c.publish(c.key, events) if rest != nil { + c.stats.Failed(len(rest)) batch.RetryEvents(rest) } return err } -func makePublish( +func (c *client) makePublish( conn redis.Conn, - key outil.Selector, - dt redisDataType, - index string, - codec codec.Codec, ) (publishFn, error) { - if dt == redisChannelType { - return makePublishPUBLISH(conn, index, codec) + if c.dataType == redisChannelType { + return c.makePublishPUBLISH(conn) } - return makePublishRPUSH(conn, key, index, codec) + return c.makePublishRPUSH(conn) } -func makePublishRPUSH(conn redis.Conn, key outil.Selector, index string, codec codec.Codec) (publishFn, error) { - if !key.IsConst() { +func (c *client) makePublishRPUSH(conn redis.Conn) (publishFn, error) { + if !c.key.IsConst() { // TODO: more clever bulk handling batching events with same key - return publishEventsPipeline(conn, "RPUSH", index, codec), nil + return c.publishEventsPipeline(conn, "RPUSH"), nil } var major, minor int @@ -176,23 +183,24 @@ func makePublishRPUSH(conn redis.Conn, key outil.Selector, index string, codec c // See: http://redis.io/commands/rpush multiValue := major > 2 || (major == 2 && minor >= 4) if multiValue { - return publishEventsBulk(conn, key, "RPUSH", index, codec), nil + return c.publishEventsBulk(conn, "RPUSH"), nil } - return publishEventsPipeline(conn, "RPUSH", index, codec), nil + return c.publishEventsPipeline(conn, "RPUSH"), nil } -func makePublishPUBLISH(conn redis.Conn, index string, codec codec.Codec) (publishFn, error) { - return publishEventsPipeline(conn, "PUBLISH", index, codec), nil +func (c *client) makePublishPUBLISH(conn redis.Conn) (publishFn, error) { + return c.publishEventsPipeline(conn, "PUBLISH"), nil } -func publishEventsBulk(conn redis.Conn, key outil.Selector, command string, index string, codec codec.Codec) publishFn { +func (c *client) publishEventsBulk(conn redis.Conn, command string) publishFn { // XXX: requires key.IsConst() == true - dest, _ := key.Select(&beat.Event{Fields: common.MapStr{}}) + dest, _ := c.key.Select(&beat.Event{Fields: common.MapStr{}}) return func(_ outil.Selector, data []publisher.Event) ([]publisher.Event, error) { args := make([]interface{}, 1, len(data)+1) args[0] = dest - data, args = serializeEvents(args, 1, data, index, codec) + okEvents, args := serializeEvents(args, 1, data, c.index, c.codec) + c.stats.Dropped(len(data) - len(okEvents)) if (len(args) - 1) == 0 { return nil, nil } @@ -201,30 +209,32 @@ func publishEventsBulk(conn redis.Conn, key outil.Selector, command string, inde _, err := conn.Do(command, args...) if err != nil { logp.Err("Failed to %v to redis list with %v", command, err) - return data, err + return okEvents, err } - ackedEvents.Add(int64(len(data))) - outputs.AckedEvents.Add(int64(len(data))) + c.stats.Acked(len(okEvents)) return nil, nil } } -func publishEventsPipeline(conn redis.Conn, command string, index string, codec codec.Codec) publishFn { +func (c *client) publishEventsPipeline(conn redis.Conn, command string) publishFn { return func(key outil.Selector, data []publisher.Event) ([]publisher.Event, error) { var okEvents []publisher.Event serialized := make([]interface{}, 0, len(data)) - okEvents, serialized = serializeEvents(serialized, 0, data, index, codec) + okEvents, serialized = serializeEvents(serialized, 0, data, c.index, c.codec) + c.stats.Dropped(len(data) - len(okEvents)) if len(serialized) == 0 { return nil, nil } data = okEvents[:0] + dropped := 0 for i, serializedEvent := range serialized { eventKey, err := key.Select(&okEvents[i].Content) if err != nil { logp.Err("Failed to set redis key: %v", err) + dropped++ continue } @@ -234,6 +244,7 @@ func publishEventsPipeline(conn redis.Conn, command string, index string, codec return okEvents, err } } + c.stats.Dropped(dropped) if err := conn.Flush(); err != nil { return data, err @@ -258,9 +269,8 @@ func publishEventsPipeline(conn redis.Conn, command string, index string, codec } } } - ackedEvents.Add(int64(len(okEvents) - len(failed))) - outputs.AckedEvents.Add(int64(len(okEvents) - len(failed))) - eventsNotAcked.Add(int64(len(failed))) + + c.stats.Acked(len(okEvents) - len(failed)) return failed, lastErr } } diff --git a/libbeat/outputs/redis/redis.go b/libbeat/outputs/redis/redis.go index 96378095084..82dce9d97a0 100644 --- a/libbeat/outputs/redis/redis.go +++ b/libbeat/outputs/redis/redis.go @@ -6,7 +6,6 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/libbeat/monitoring" "github.com/elastic/beats/libbeat/outputs" "github.com/elastic/beats/libbeat/outputs/codec" "github.com/elastic/beats/libbeat/outputs/outil" @@ -19,19 +18,6 @@ type redisOut struct { var debugf = logp.MakeDebug("redis") -// Metrics that can retrieved through the expvar web interface. -var ( - redisMetrics = outputs.Metrics.NewRegistry("redis") - - ackedEvents = monitoring.NewInt(redisMetrics, "events.acked") - eventsNotAcked = monitoring.NewInt(redisMetrics, "events.not_acked") - - statReadBytes = monitoring.NewInt(redisMetrics, "read.bytes") - statWriteBytes = monitoring.NewInt(redisMetrics, "write.bytes") - statReadErrors = monitoring.NewInt(redisMetrics, "read.errors") - statWriteErrors = monitoring.NewInt(redisMetrics, "write.errors") -) - const ( defaultWaitRetry = 1 * time.Second defaultMaxWaitRetry = 60 * time.Second @@ -41,7 +27,11 @@ func init() { outputs.RegisterType("redis", makeRedis) } -func makeRedis(beat common.BeatInfo, cfg *common.Config) (outputs.Group, error) { +func makeRedis( + beat common.BeatInfo, + stats *outputs.Stats, + cfg *common.Config, +) (outputs.Group, error) { config := defaultConfig if err := cfg.Unpack(&config); err != nil { return outputs.Fail(err) @@ -98,14 +88,7 @@ func makeRedis(beat common.BeatInfo, cfg *common.Config) (outputs.Group, error) Timeout: config.Timeout, Proxy: &config.Proxy, TLS: tls, - Stats: &transport.IOStats{ - Read: statReadBytes, - Write: statWriteBytes, - ReadErrors: statReadErrors, - WriteErrors: statWriteErrors, - OutputsWrite: outputs.WriteBytes, - OutputsWriteErrors: outputs.WriteErrors, - }, + Stats: stats, } clients := make([]outputs.NetworkClient, len(hosts)) @@ -120,7 +103,7 @@ func makeRedis(beat common.BeatInfo, cfg *common.Config) (outputs.Group, error) return outputs.Fail(err) } - clients[i] = newClient(conn, config.Timeout, + clients[i] = newClient(conn, stats, config.Timeout, config.Password, config.Db, key, dataType, config.Index, enc) } diff --git a/libbeat/outputs/redis/redis_integration_test.go b/libbeat/outputs/redis/redis_integration_test.go index a87ec8de882..a2aee71e386 100644 --- a/libbeat/outputs/redis/redis_integration_test.go +++ b/libbeat/outputs/redis/redis_integration_test.go @@ -261,7 +261,7 @@ func newRedisTestingOutput(t *testing.T, cfg map[string]interface{}) *client { t.Fatalf("redis output module not registered") } - out, err := plugin(common.BeatInfo{Beat: "libbeat"}, config) + out, err := plugin(common.BeatInfo{Beat: "libbeat"}, nil, config) if err != nil { t.Fatalf("Failed to initialize redis output: %v", err) } diff --git a/libbeat/outputs/transport/client.go b/libbeat/outputs/transport/client.go index f804a1f2c2d..9339664b96b 100644 --- a/libbeat/outputs/transport/client.go +++ b/libbeat/outputs/transport/client.go @@ -23,7 +23,7 @@ type Config struct { Proxy *ProxyConfig TLS *TLSConfig Timeout time.Duration - Stats *IOStats + Stats IOStatser } func MakeDialer(c *Config) (Dialer, error) { diff --git a/libbeat/outputs/transport/stats.go b/libbeat/outputs/transport/stats.go index e20d7133c8a..0626f35eb11 100644 --- a/libbeat/outputs/transport/stats.go +++ b/libbeat/outputs/transport/stats.go @@ -2,20 +2,22 @@ package transport import ( "net" - - "github.com/elastic/beats/libbeat/monitoring" ) -type IOStats struct { - Read, Write, ReadErrors, WriteErrors, OutputsWrite, OutputsWriteErrors *monitoring.Int +type IOStatser interface { + WriteError() + WriteBytes(int) + + ReadError() + ReadBytes(int) } type statsConn struct { net.Conn - stats *IOStats + stats IOStatser } -func StatsDialer(d Dialer, s *IOStats) Dialer { +func StatsDialer(d Dialer, s IOStatser) Dialer { return ConnWrapper(d, func(c net.Conn) net.Conn { return &statsConn{c, s} }) @@ -24,19 +26,17 @@ func StatsDialer(d Dialer, s *IOStats) Dialer { func (s *statsConn) Read(b []byte) (int, error) { n, err := s.Conn.Read(b) if err != nil { - s.stats.ReadErrors.Add(1) + s.stats.ReadError() } - s.stats.Read.Add(int64(n)) + s.stats.ReadBytes(n) return n, err } func (s *statsConn) Write(b []byte) (int, error) { n, err := s.Conn.Write(b) if err != nil { - s.stats.WriteErrors.Add(1) - s.stats.OutputsWriteErrors.Add(1) + s.stats.WriteError() } - s.stats.Write.Add(int64(n)) - s.stats.OutputsWrite.Add(int64(n)) + s.stats.WriteBytes(n) return n, err } diff --git a/libbeat/publisher/bc/publisher/pipeline.go b/libbeat/publisher/bc/publisher/pipeline.go index 9e8e8845fe7..4aa30378ed7 100644 --- a/libbeat/publisher/bc/publisher/pipeline.go +++ b/libbeat/publisher/bc/publisher/pipeline.go @@ -6,6 +6,7 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/monitoring" "github.com/elastic/beats/libbeat/outputs" "github.com/elastic/beats/libbeat/processors" "github.com/elastic/beats/libbeat/publisher/broker" @@ -18,6 +19,12 @@ func createPipeline( processors *processors.Processors, outcfg common.ConfigNamespace, ) (*pipeline.Pipeline, error) { + + reg := monitoring.Default.GetRegistry("libbeat") + if reg == nil { + reg = monitoring.Default.NewRegistry("libbeat") + } + var out outputs.Group if !(*publishDisabled) { var err error @@ -28,10 +35,16 @@ func createPipeline( return nil, errors.New(msg) } - out, err = outputs.Load(beatInfo, outcfg.Name(), outcfg.Config()) + // TODO: add support to unload/reassign outStats on output reloading + outReg := reg.NewRegistry("output") + outStats := outputs.MakeStats(outReg) + + out, err = outputs.Load(beatInfo, &outStats, outcfg.Name(), outcfg.Config()) if err != nil { return nil, err } + + monitoring.NewString(outReg, "type").Set(outcfg.Name()) } name := shipper.Name @@ -70,6 +83,7 @@ func createPipeline( } p, err := pipeline.New( + monitoring.Default.GetRegistry("libbeat"), func(eventer broker.Eventer) (broker.Broker, error) { return brokerFactory(eventer, brokerConfig) }, diff --git a/libbeat/publisher/pipeline/batch.go b/libbeat/publisher/pipeline/batch.go index 1a4af5377af..06d28276442 100644 --- a/libbeat/publisher/pipeline/batch.go +++ b/libbeat/publisher/pipeline/batch.go @@ -15,7 +15,8 @@ type Batch struct { } type batchContext struct { - retryer *retryer + observer *observer + retryer *retryer } var batchPool = sync.Pool{ @@ -49,6 +50,7 @@ func (b *Batch) Events() []publisher.Event { } func (b *Batch) ACK() { + b.ctx.observer.outBatchACKed(len(b.events)) b.original.ACK() releaseBatch(b) } @@ -67,11 +69,22 @@ func (b *Batch) Cancelled() { } func (b *Batch) RetryEvents(events []publisher.Event) { - b.events = events + b.updEvents(events) b.Retry() } func (b *Batch) CancelledEvents(events []publisher.Event) { - b.events = events + b.updEvents(events) b.Cancelled() } + +func (b *Batch) updEvents(events []publisher.Event) { + l1 := len(b.events) + l2 := len(events) + if l1 > l2 { + // report subset of events not to be retried as ACKed + b.ctx.observer.outBatchACKed(l1 - l2) + } + + b.events = events +} diff --git a/libbeat/publisher/pipeline/client.go b/libbeat/publisher/pipeline/client.go index c2fa56e638e..1e1d92d21f2 100644 --- a/libbeat/publisher/pipeline/client.go +++ b/libbeat/publisher/pipeline/client.go @@ -51,6 +51,8 @@ func (c *client) publish(e beat.Event) { log = c.pipeline.logger ) + c.onNewEvent() + if c.processors != nil { var err error @@ -113,7 +115,6 @@ func (c *client) Close() error { log := c.pipeline.logger c.onClosing() - defer c.onClosed() log.Debug("client: closing acker") c.acker.close() @@ -130,34 +131,44 @@ func (c *client) Close() error { } } + c.onClosed() return nil } func (c *client) onClosing() { + c.pipeline.observer.clientClosing() if c.eventer != nil { c.eventer.Closing() } } func (c *client) onClosed() { + c.pipeline.observer.clientClosed() if c.eventer != nil { c.eventer.Closed() } } +func (c *client) onNewEvent() { + c.pipeline.observer.newEvent() +} + func (c *client) onPublished() { + c.pipeline.observer.publishedEvent() if c.eventer != nil { c.eventer.Published() } } func (c *client) onFilteredOut(e beat.Event) { + c.pipeline.observer.filteredEvent() if c.eventer != nil { c.eventer.FilteredOut(e) } } func (c *client) onDroppedOnPublish(e beat.Event) { + c.pipeline.observer.failedPublishEvent() if c.eventer != nil { c.eventer.DroppedOnPublish(e) } diff --git a/libbeat/publisher/pipeline/controller.go b/libbeat/publisher/pipeline/controller.go index ee9949b4ba0..1b8b9424503 100644 --- a/libbeat/publisher/pipeline/controller.go +++ b/libbeat/publisher/pipeline/controller.go @@ -11,7 +11,9 @@ import ( // - stop // - reload type outputController struct { - logger *logp.Logger + logger *logp.Logger + observer *observer + broker broker.Broker retryer *retryer @@ -38,16 +40,19 @@ type outputWorker interface { func newOutputController( log *logp.Logger, + observer *observer, b broker.Broker, ) *outputController { c := &outputController{ - logger: log, - broker: b, + logger: log, + observer: observer, + broker: b, } ctx := &batchContext{} c.consumer = newEventConsumer(log, b, ctx) - c.retryer = newRetryer(log, nil, c.consumer) + c.retryer = newRetryer(log, observer, nil, c.consumer) + ctx.observer = c.observer ctx.retryer = c.retryer c.consumer.sigContinue() @@ -77,7 +82,7 @@ func (c *outputController) Set(outGrp outputs.Group) { queue := makeWorkQueue() worker := make([]outputWorker, len(clients)) for i, client := range clients { - worker[i] = makeClientWorker(queue, client) + worker[i] = makeClientWorker(c.observer, queue, client) } grp := &outputGroup{ workQueue: queue, @@ -108,6 +113,8 @@ func (c *outputController) Set(outGrp outputs.Group) { // restart consumer (potentially blocked by retryer) c.consumer.sigContinue() + + c.observer.updateOutputGroup() } func makeWorkQueue() workQueue { diff --git a/libbeat/publisher/pipeline/monitoring.go b/libbeat/publisher/pipeline/monitoring.go new file mode 100644 index 00000000000..91c2a80cd9f --- /dev/null +++ b/libbeat/publisher/pipeline/monitoring.go @@ -0,0 +1,128 @@ +package pipeline + +import "github.com/elastic/beats/libbeat/monitoring" + +// observer is used by many component in the publisher pipeline, to report +// internal events. The oberserver can call registered global event handlers or +// updated shared counters/metrics for reporting. +// All events required for reporting events/metrics on the pipeline-global level +// are defined by observer. The components are only allowed to serve localized +// event-handlers only (e.g. the client centric events callbacks) +type observer struct { + metrics *monitoring.Registry + + // clients metrics + clients *monitoring.Uint + + // events publish/dropped stats + events, filtered, published, failed *monitoring.Uint + dropped, retry *monitoring.Uint // (retryer) drop/retry counters + activeEvents *monitoring.Uint + + // queue metrics + ackedQueue *monitoring.Uint +} + +func (o *observer) init(metrics *monitoring.Registry) { + o.metrics = metrics + reg := metrics.GetRegistry("pipeline") + if reg == nil { + reg = metrics.NewRegistry("pipeline") + } + + *o = observer{ + metrics: metrics, + clients: monitoring.NewUint(reg, "clients"), + + events: monitoring.NewUint(reg, "events.total"), + filtered: monitoring.NewUint(reg, "events.filtered"), + published: monitoring.NewUint(reg, "events.published"), + failed: monitoring.NewUint(reg, "events.failed"), + dropped: monitoring.NewUint(reg, "events.dropped"), + retry: monitoring.NewUint(reg, "events.retry"), + + ackedQueue: monitoring.NewUint(reg, "queue.acked"), + + activeEvents: monitoring.NewUint(reg, "events.active"), + } +} + +func (o *observer) cleanup() { + o.metrics.Remove("pipeline") // drop all metrics from registry +} + +// +// client connects/disconnects +// + +// (pipeline) pipeline did finish creating a new client instance +func (o *observer) clientConnected() { o.clients.Inc() } + +// (client) close being called on client +func (o *observer) clientClosing() {} + +// (client) client finished processing close +func (o *observer) clientClosed() { o.clients.Dec() } + +// +// client publish events +// + +// (client) client is trying to publish a new event +func (o *observer) newEvent() { + o.events.Inc() + o.activeEvents.Inc() +} + +// (client) event is filtered out (on purpose or failed) +func (o *observer) filteredEvent() { + o.filtered.Inc() + o.activeEvents.Dec() +} + +// (client) managed to push an event into the publisher pipeline +func (o *observer) publishedEvent() { + o.published.Inc() +} + +// (client) client closing down or DropIfFull is set +func (o *observer) failedPublishEvent() { + o.failed.Inc() + o.activeEvents.Dec() +} + +// +// queue events +// + +// (queue) number of events ACKed by the queue/broker in use +func (o *observer) queueACKed(n int) { + o.ackedQueue.Add(uint64(n)) + o.activeEvents.Sub(uint64(n)) +} + +// +// pipeline output events +// + +// (controller) new output group is about to be loaded +func (o *observer) updateOutputGroup() {} + +// (retryer) new failed batch has been received +func (o *observer) eventsFailed(int) {} + +// (retryer) number of events dropped by retryer +func (o *observer) eventsDropped(n int) { + o.dropped.Add(uint64(n)) +} + +// (retryer) number of events pushed to the output worker queue +func (o *observer) eventsRetry(n int) { + o.retry.Add(uint64(n)) +} + +// (output) number of events to be forwarded to the output client +func (o *observer) outBatchSend(int) {} + +// (output) number of events acked by the output batch +func (o *observer) outBatchACKed(int) {} diff --git a/libbeat/publisher/pipeline/output.go b/libbeat/publisher/pipeline/output.go index 688d272f677..11f8e39990b 100644 --- a/libbeat/publisher/pipeline/output.go +++ b/libbeat/publisher/pipeline/output.go @@ -8,24 +8,26 @@ import ( // clientWorker manages output client of type outputs.Client, not supporting reconnect. type clientWorker struct { - qu workQueue - client outputs.Client - closed atomic.Bool + observer *observer + qu workQueue + client outputs.Client + closed atomic.Bool } // netClientWorker manages reconnectable output clients of type outputs.NetworkClient. type netClientWorker struct { - qu workQueue - client outputs.NetworkClient - closed atomic.Bool + observer *observer + qu workQueue + client outputs.NetworkClient + closed atomic.Bool batchSize int batchSizer func() int } -func makeClientWorker(qu workQueue, client outputs.Client) outputWorker { +func makeClientWorker(observer *observer, qu workQueue, client outputs.Client) outputWorker { if nc, ok := client.(outputs.NetworkClient); ok { - c := &netClientWorker{qu: qu, client: nc} + c := &netClientWorker{observer: observer, qu: qu, client: nc} go c.run() return c } @@ -42,6 +44,8 @@ func (w *clientWorker) Close() error { func (w *clientWorker) run() { for !w.closed.Load() { for batch := range w.qu { + w.observer.outBatchSend(len(batch.events)) + if err := w.client.Publish(batch); err != nil { return } @@ -77,6 +81,9 @@ func (w *netClientWorker) run() { // send loop for batch := range w.qu { if w.closed.Load() { + if batch != nil { + batch.Cancelled() + } return } diff --git a/libbeat/publisher/pipeline/pipeline.go b/libbeat/publisher/pipeline/pipeline.go index aa8a441333e..1ed61c4b8d6 100644 --- a/libbeat/publisher/pipeline/pipeline.go +++ b/libbeat/publisher/pipeline/pipeline.go @@ -10,6 +10,8 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/atomic" + "github.com/elastic/beats/libbeat/monitoring" + "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/outputs" "github.com/elastic/beats/libbeat/processors" @@ -36,6 +38,8 @@ type Pipeline struct { broker broker.Broker output *outputController + observer observer + eventer pipelineEventer // wait close support @@ -108,6 +112,7 @@ type pipelineEventer struct { mutex sync.Mutex modifyable bool + observer *observer waitClose *waitCloser cb *pipelineEventCB } @@ -121,7 +126,11 @@ type brokerFactory func(broker.Eventer) (broker.Broker, error) // Load uses a Config object to create a new complete Pipeline instance with // configured broker and outputs. -func Load(beatInfo common.BeatInfo, config Config) (*Pipeline, error) { +func Load( + beatInfo common.BeatInfo, + monitoring *monitoring.Registry, + config Config, +) (*Pipeline, error) { if !config.Output.IsSet() { return nil, errors.New("no output configured") } @@ -130,13 +139,13 @@ func Load(beatInfo common.BeatInfo, config Config) (*Pipeline, error) { return broker.Load(e, config.Broker) } - output, err := outputs.Load(beatInfo, config.Output.Name(), config.Output.Config()) + output, err := outputs.Load(beatInfo, nil, config.Output.Name(), config.Output.Config()) if err != nil { return nil, err } // TODO: configure pipeline processors - pipeline, err := New(brokerFactory, output, Settings{ + pipeline, err := New(monitoring, brokerFactory, output, Settings{ WaitClose: config.WaitShutdown, WaitCloseMode: WaitOnPipelineClose, }) @@ -154,11 +163,11 @@ func Load(beatInfo common.BeatInfo, config Config) (*Pipeline, error) { // The new pipeline will take ownership of broker and outputs. On Close, the // broker and outputs will be closed. func New( + metrics *monitoring.Registry, brokerFactory brokerFactory, out outputs.Group, settings Settings, ) (*Pipeline, error) { - annotations := settings.Annotations var err error @@ -195,6 +204,8 @@ func New( } p.ackBuilder = &pipelineEmptyACK{p} p.ackActive = atomic.MakeBool(true) + + p.eventer.observer = &p.observer p.eventer.modifyable = true if settings.WaitCloseMode == WaitOnPipelineClose && settings.WaitClose > 0 { @@ -210,7 +221,9 @@ func New( } p.eventSema = newSema(p.broker.BufferConfig().Events) - p.output = newOutputController(log, p.broker) + p.observer.init(metrics) + + p.output = newOutputController(log, &p.observer, p.broker) p.output.Set(out) return p, nil @@ -292,6 +305,7 @@ func (p *Pipeline) Close() error { log.Err("pipeline broker shutdown error: ", err) } + p.observer.cleanup() return nil } @@ -375,10 +389,13 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { reportEvents: reportEvents, } + p.observer.clientConnected() return client, nil } func (e *pipelineEventer) OnACK(n int) { + e.observer.queueACKed(n) + if wc := e.waitClose; wc != nil { wc.dec(n) } diff --git a/libbeat/publisher/pipeline/retry.go b/libbeat/publisher/pipeline/retry.go index 18263115b6f..c9fab386f9a 100644 --- a/libbeat/publisher/pipeline/retry.go +++ b/libbeat/publisher/pipeline/retry.go @@ -13,8 +13,10 @@ import ( // will the consumer be paused, until some batches have been processed by some // outputs. type retryer struct { - logger *logp.Logger - done chan struct{} + logger *logp.Logger + observer *observer + + done chan struct{} consumer *eventConsumer @@ -50,9 +52,15 @@ const ( cancelledBatch ) -func newRetryer(log *logp.Logger, out workQueue, c *eventConsumer) *retryer { +func newRetryer( + log *logp.Logger, + observer *observer, + out workQueue, + c *eventConsumer, +) *retryer { r := &retryer{ logger: log, + observer: observer, done: make(chan struct{}), sig: make(chan retryerSignal, 3), in: retryQueue(make(chan batchEvent, 3)), @@ -93,9 +101,10 @@ func (r *retryer) cancelled(b *Batch) { func (r *retryer) loop() { var ( out workQueue - active *Batch consumerBlocked bool + active *Batch + activeSize int buffer []*Batch numOutputs int @@ -108,9 +117,22 @@ func (r *retryer) loop() { return case evt := <-r.in: - batch := evt.batch + var ( + countFailed int + countDropped int + batch = evt.batch + countRetry = len(batch.events) + ) + if evt.tag == retryBatch { + countFailed = len(batch.events) + r.observer.eventsFailed(countFailed) + decBatch(batch) + + countRetry = len(batch.events) + countDropped = countFailed - countRetry + r.observer.eventsDropped(countDropped) } if len(batch.events) == 0 { @@ -121,6 +143,7 @@ func (r *retryer) loop() { buffer = append(buffer, batch) out = r.out active = buffer[0] + activeSize = len(active.events) if !consumerBlocked { consumerBlocked = blockConsumer(numOutputs, len(buffer)) if consumerBlocked { @@ -132,13 +155,16 @@ func (r *retryer) loop() { } case out <- active: + r.observer.eventsRetry(activeSize) + buffer = buffer[1:] - active = nil + active, activeSize = nil, 0 if len(buffer) == 0 { out = nil } else { active = buffer[0] + activeSize = len(active.events) } if consumerBlocked {