From 5ed9f16a4e51c4512c73689a5af48b90bada5633 Mon Sep 17 00:00:00 2001 From: Dan Kortschak <90160302+efd6@users.noreply.github.com> Date: Wed, 25 Jan 2023 07:11:02 +1030 Subject: [PATCH] filebeat/input/tcp: add support for TCP input metrics (#34333) * convert to v2 input * add input metrics support --- CHANGELOG.next.asciidoc | 1 + filebeat/docs/inputs/input-tcp.asciidoc | 18 ++ filebeat/include/list.go | 1 - filebeat/input/default-inputs/inputs.go | 2 + filebeat/input/tcp/config.go | 47 ---- filebeat/input/tcp/input.go | 323 ++++++++++++++++++------ filebeat/input/tcp/input_test.go | 55 ---- filebeat/input/udp/input.go | 4 +- 8 files changed, 267 insertions(+), 184 deletions(-) delete mode 100644 filebeat/input/tcp/config.go delete mode 100644 filebeat/input/tcp/input_test.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 08304b740e2..4f145417d10 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -170,6 +170,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] - Allow user configuration of keep-alive behaviour for HTTPJSON and CEL inputs. {issue}33951[33951] {pull}34014[34014] - Add support for polling system UDP stats for UDP input metrics. {pull}34070[34070] - Add support for recognizing the log level in Elasticsearch JVM logs {pull}34159[34159] +- Add metrics for TCP packet processing. {pull}34333[34333] *Auditbeat* diff --git a/filebeat/docs/inputs/input-tcp.asciidoc b/filebeat/docs/inputs/input-tcp.asciidoc index b373a36d96c..93a82ae1a14 100644 --- a/filebeat/docs/inputs/input-tcp.asciidoc +++ b/filebeat/docs/inputs/input-tcp.asciidoc @@ -27,6 +27,24 @@ The `tcp` input supports the following configuration options plus the include::../inputs/input-common-tcp-options.asciidoc[] +[float] +=== Metrics + +This input exposes metrics under the <>. +These metrics are exposed under the `/dataset` path. They can be used to +observe the activity of the input. + +[options="header"] +|======= +| Metric | Description +| `device` | Host/port of the TCP stream. +| `received_events_total` | Total number of packets (events) that have been received. +| `received_bytes_total` | Total number of bytes received. +| `receive_queue_length` | Size of the system receive queue (linux only) (guage). +| `arrival_period` | Histogram of the time between successive packets in nanoseconds. +| `processing_time` | Histogram of the time taken to process packets in nanoseconds. +|======= + [id="{beatname_lc}-input-{type}-common-options"] include::../inputs/input-common-options.asciidoc[] diff --git a/filebeat/include/list.go b/filebeat/include/list.go index eda7cb7bf67..393a5097a80 100644 --- a/filebeat/include/list.go +++ b/filebeat/include/list.go @@ -27,7 +27,6 @@ import ( _ "github.com/elastic/beats/v7/filebeat/input/redis" _ "github.com/elastic/beats/v7/filebeat/input/stdin" _ "github.com/elastic/beats/v7/filebeat/input/syslog" - _ "github.com/elastic/beats/v7/filebeat/input/tcp" _ "github.com/elastic/beats/v7/filebeat/module/apache" _ "github.com/elastic/beats/v7/filebeat/module/auditd" _ "github.com/elastic/beats/v7/filebeat/module/elasticsearch" diff --git a/filebeat/input/default-inputs/inputs.go b/filebeat/input/default-inputs/inputs.go index fb580179bb9..4b0c86f6a0d 100644 --- a/filebeat/input/default-inputs/inputs.go +++ b/filebeat/input/default-inputs/inputs.go @@ -21,6 +21,7 @@ import ( "github.com/elastic/beats/v7/filebeat/beater" "github.com/elastic/beats/v7/filebeat/input/filestream" "github.com/elastic/beats/v7/filebeat/input/kafka" + "github.com/elastic/beats/v7/filebeat/input/tcp" "github.com/elastic/beats/v7/filebeat/input/udp" "github.com/elastic/beats/v7/filebeat/input/unix" v2 "github.com/elastic/beats/v7/filebeat/input/v2" @@ -39,6 +40,7 @@ func genericInputs(log *logp.Logger, components beater.StateStore) []v2.Plugin { return []v2.Plugin{ filestream.Plugin(log, components), kafka.Plugin(), + tcp.Plugin(), udp.Plugin(), unix.Plugin(), } diff --git a/filebeat/input/tcp/config.go b/filebeat/input/tcp/config.go deleted file mode 100644 index 36e72aa8e44..00000000000 --- a/filebeat/input/tcp/config.go +++ /dev/null @@ -1,47 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package tcp - -import ( - "time" - - "github.com/dustin/go-humanize" - - "github.com/elastic/beats/v7/filebeat/harvester" - "github.com/elastic/beats/v7/filebeat/inputsource/common/streaming" - "github.com/elastic/beats/v7/filebeat/inputsource/tcp" -) - -type config struct { - tcp.Config `config:",inline"` - harvester.ForwarderConfig `config:",inline"` - - LineDelimiter string `config:"line_delimiter" validate:"nonzero"` - Framing streaming.FramingType `config:"framing"` -} - -var defaultConfig = config{ - ForwarderConfig: harvester.ForwarderConfig{ - Type: "tcp", - }, - Config: tcp.Config{ - Timeout: time.Minute * 5, - MaxMessageSize: 20 * humanize.MiByte, - }, - LineDelimiter: "\n", -} diff --git a/filebeat/input/tcp/input.go b/filebeat/input/tcp/input.go index 0486f7310af..97fa1bf7db6 100644 --- a/filebeat/input/tcp/input.go +++ b/filebeat/input/tcp/input.go @@ -18,128 +18,293 @@ package tcp import ( - "sync" + "bytes" + "encoding/binary" + "errors" + "fmt" + "net" + "os" + "runtime" + "strconv" + "strings" "time" - "github.com/elastic/beats/v7/filebeat/channel" - "github.com/elastic/beats/v7/filebeat/harvester" - "github.com/elastic/beats/v7/filebeat/input" + "github.com/dustin/go-humanize" + "github.com/rcrowley/go-metrics" + + input "github.com/elastic/beats/v7/filebeat/input/v2" + stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless" "github.com/elastic/beats/v7/filebeat/inputsource" "github.com/elastic/beats/v7/filebeat/inputsource/common/streaming" "github.com/elastic/beats/v7/filebeat/inputsource/tcp" "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/feature" + "github.com/elastic/beats/v7/libbeat/monitoring/inputmon" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" + "github.com/elastic/elastic-agent-libs/monitoring" + "github.com/elastic/elastic-agent-libs/monitoring/adapter" + "github.com/elastic/go-concert/ctxtool" ) -func init() { - err := input.Register("tcp", NewInput) - if err != nil { - panic(err) +func Plugin() input.Plugin { + return input.Plugin{ + Name: "tcp", + Stability: feature.Stable, + Deprecated: false, + Info: "tcp packet server", + Manager: stateless.NewInputManager(configure), } } -// Input for TCP connection -type Input struct { - mutex sync.Mutex - server *tcp.Server - started bool - outlet channel.Outleter - config *config - log *logp.Logger +func configure(cfg *conf.C) (stateless.Input, error) { + config := defaultConfig() + if err := cfg.Unpack(&config); err != nil { + return nil, err + } + + return newServer(config) } -// NewInput creates a new TCP input -func NewInput( - cfg *conf.C, - connector channel.Connector, - context input.Context, -) (input.Input, error) { - out, err := connector.Connect(cfg) - if err != nil { - return nil, err +func defaultConfig() config { + return config{ + Config: tcp.Config{ + Timeout: time.Minute * 5, + MaxMessageSize: 20 * humanize.MiByte, + }, + LineDelimiter: "\n", } +} + +type server struct { + tcp.Server + config +} + +type config struct { + tcp.Config `config:",inline"` + + LineDelimiter string `config:"line_delimiter" validate:"nonzero"` + Framing streaming.FramingType `config:"framing"` +} - forwarder := harvester.NewForwarder(out) +func newServer(config config) (*server, error) { + return &server{config: config}, nil +} + +func (s *server) Name() string { return "tcp" } - config := defaultConfig - err = cfg.Unpack(&config) +func (s *server) Test(_ input.TestContext) error { + l, err := net.Listen("tcp", s.config.Config.Host) if err != nil { - return nil, err + return err } + return l.Close() +} - cb := func(data []byte, metadata inputsource.NetworkMetadata) { - event := createEvent(data, metadata) - _ = forwarder.Send(event) - } +func (s *server) Run(ctx input.Context, publisher stateless.Publisher) error { + log := ctx.Logger.With("host", s.config.Config.Host) + + log.Info("starting tcp socket input") + defer log.Info("tcp input stopped") + + const pollInterval = time.Minute + metrics := newInputMetrics(ctx.ID, s.config.Host, pollInterval, log) + defer metrics.close() - splitFunc, err := streaming.SplitFunc(config.Framing, []byte(config.LineDelimiter)) + split, err := streaming.SplitFunc(s.config.Framing, []byte(s.config.LineDelimiter)) if err != nil { - return nil, err + return err } - logger := logp.NewLogger("input.tcp").With("address", config.Config.Host) - factory := streaming.SplitHandlerFactory(inputsource.FamilyTCP, logger, tcp.MetadataCallback, cb, splitFunc) + server, err := tcp.New(&s.config.Config, streaming.SplitHandlerFactory( + inputsource.FamilyTCP, log, tcp.MetadataCallback, func(data []byte, metadata inputsource.NetworkMetadata) { + evt := beat.Event{ + Timestamp: time.Now(), + Fields: mapstr.M{ + "message": string(data), + }, + } + if metadata.RemoteAddr != nil { + evt.Fields["log"] = mapstr.M{ + "source": mapstr.M{ + "address": metadata.RemoteAddr.String(), + }, + } + } + + publisher.Publish(evt) - server, err := tcp.New(&config.Config, factory) + // This must be called after publisher.Publish to measure + // the processing time metric. + metrics.log(data, evt.Timestamp) + }, + split, + )) if err != nil { - return nil, err + return err } - return &Input{ - server: server, - started: false, - outlet: out, - config: &config, - log: logger, - }, nil + log.Debug("tcp input initialized") + + err = server.Run(ctxtool.FromCanceller(ctx.Cancelation)) + // Ignore error from 'Run' in case shutdown was signaled. + if ctxerr := ctx.Cancelation.Err(); ctxerr != nil { + err = ctxerr + } + return err } -// Run start a TCP input -func (p *Input) Run() { - p.mutex.Lock() - defer p.mutex.Unlock() +// inputMetrics handles the input's metric reporting. +type inputMetrics struct { + unregister func() + done chan struct{} + + lastPacket time.Time - if !p.started { - p.log.Info("Starting TCP input") - err := p.server.Start() + device *monitoring.String // name of the device being monitored + packets *monitoring.Uint // number of packets processed + bytes *monitoring.Uint // number of bytes processed + rxQueue *monitoring.Uint // value of the rx_queue field from /proc/net/tcp (only on linux systems) + arrivalPeriod metrics.Sample // histogram of the elapsed time between packet arrivals + processingTime metrics.Sample // histogram of the elapsed time between packet receipt and publication +} + +// newInputMetrics returns an input metric for the TCP processor. If id is empty +// a nil inputMetric is returned. +func newInputMetrics(id, device string, poll time.Duration, log *logp.Logger) *inputMetrics { + if id == "" { + return nil + } + reg, unreg := inputmon.NewInputRegistry("tcp", id, nil) + out := &inputMetrics{ + unregister: unreg, + device: monitoring.NewString(reg, "device"), + packets: monitoring.NewUint(reg, "received_events_total"), + bytes: monitoring.NewUint(reg, "received_bytes_total"), + rxQueue: monitoring.NewUint(reg, "receive_queue_length"), + arrivalPeriod: metrics.NewUniformSample(1024), + processingTime: metrics.NewUniformSample(1024), + } + _ = adapter.NewGoMetrics(reg, "arrival_period", adapter.Accept). + Register("histogram", metrics.NewHistogram(out.arrivalPeriod)) + _ = adapter.NewGoMetrics(reg, "processing_time", adapter.Accept). + Register("histogram", metrics.NewHistogram(out.processingTime)) + + out.device.Set(device) + + if poll > 0 && runtime.GOOS == "linux" { + host, port, ok := strings.Cut(device, ":") + if !ok { + log.Warnf("failed to get address for %s: no port separator", device) + return out + } + ip, err := net.LookupIP(host) if err != nil { - p.log.Errorw("Error starting the TCP server", "error", err) + log.Warnf("failed to get address for %s: %v", device, err) + return out } - p.started = true + p, err := strconv.ParseInt(port, 10, 16) + if err != nil { + log.Warnf("failed to get port for %s: %v", device, err) + return out + } + ph := strconv.FormatInt(p, 16) + addr := make([]string, 0, len(ip)) + for _, p := range ip { + p4 := p.To4() + if len(p4) != net.IPv4len { + continue + } + addr = append(addr, fmt.Sprintf("%X:%s", binary.LittleEndian.Uint32(p4), ph)) + } + out.done = make(chan struct{}) + go out.poll(addr, poll, log) } -} -// Stop stops TCP server -func (p *Input) Stop() { - defer p.outlet.Close() - p.mutex.Lock() - defer p.mutex.Unlock() + return out +} - p.log.Info("Stopping TCP input") - p.server.Stop() - p.started = false +// log logs metric for the given packet. +func (m *inputMetrics) log(data []byte, timestamp time.Time) { + if m == nil { + return + } + m.processingTime.Update(time.Since(timestamp).Nanoseconds()) + m.packets.Add(1) + m.bytes.Add(uint64(len(data))) + if !m.lastPacket.IsZero() { + m.arrivalPeriod.Update(timestamp.Sub(m.lastPacket).Nanoseconds()) + } + m.lastPacket = timestamp } -// Wait stop the current server -func (p *Input) Wait() { - p.Stop() +// poll periodically gets TCP buffer stats from the OS. +func (m *inputMetrics) poll(addr []string, each time.Duration, log *logp.Logger) { + t := time.NewTicker(each) + for { + select { + case <-t.C: + rx, err := procNetTCP(addr) + if err != nil { + log.Warnf("failed to get tcp stats from /proc: %v", err) + continue + } + m.rxQueue.Set(uint64(rx)) + case <-m.done: + t.Stop() + return + } + } } -func createEvent(raw []byte, metadata inputsource.NetworkMetadata) beat.Event { - evt := beat.Event{ - Timestamp: time.Now(), - Fields: mapstr.M{ - "message": string(raw), - }, +// procNetTCP returns the rx_queue field of the TCP socket table for the +// socket on the provided address formatted in hex, xxxxxxxx:xxxx. +// This function is only useful on linux due to its dependence on the /proc +// filesystem, but is kept in this file for simplicity. +func procNetTCP(addr []string) (rx int64, err error) { + b, err := os.ReadFile("/proc/net/tcp") + if err != nil { + return 0, err } - if metadata.RemoteAddr != nil { - evt.Fields["log"] = mapstr.M{ - "source": mapstr.M{ - "address": metadata.RemoteAddr.String(), - }, + lines := bytes.Split(b, []byte("\n")) + if len(lines) < 2 { + return 0, fmt.Errorf("/proc/net/tcp entry not found for %s (no line)", addr) + } + for _, l := range lines[1:] { + f := bytes.Fields(l) + if contains(f[1], addr) { + _, r, ok := bytes.Cut(f[4], []byte(":")) + if !ok { + return 0, errors.New("no rx_queue field " + string(f[4])) + } + rx, err = strconv.ParseInt(string(r), 16, 64) + if err != nil { + return 0, fmt.Errorf("failed to parse rx_queue: %w", err) + } + return rx, nil } } - return evt + return 0, fmt.Errorf("/proc/net/tcp entry not found for %s", addr) +} + +func contains(b []byte, addr []string) bool { + for _, a := range addr { + if strings.EqualFold(string(b), a) { + return true + } + } + return false +} + +func (m *inputMetrics) close() { + if m == nil { + return + } + if m.done != nil { + // Shut down poller and wait until done before unregistering metrics. + m.done <- struct{}{} + } + m.unregister() } diff --git a/filebeat/input/tcp/input_test.go b/filebeat/input/tcp/input_test.go deleted file mode 100644 index 4133de6b334..00000000000 --- a/filebeat/input/tcp/input_test.go +++ /dev/null @@ -1,55 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package tcp - -import ( - "net" - "testing" - - "github.com/stretchr/testify/assert" - - "github.com/elastic/beats/v7/filebeat/input/inputtest" - "github.com/elastic/beats/v7/filebeat/inputsource" - "github.com/elastic/elastic-agent-libs/mapstr" -) - -func TestCreateEvent(t *testing.T) { - hello := "hello world" - ip := "127.0.0.1" - parsedIP := net.ParseIP(ip) - addr := &net.IPAddr{IP: parsedIP, Zone: ""} - - message := []byte(hello) - mt := inputsource.NetworkMetadata{RemoteAddr: addr} - - event := createEvent(message, mt) - - m, err := event.GetValue("message") - assert.NoError(t, err) - assert.Equal(t, string(message), m) - - from, _ := event.GetValue("log.source.address") - assert.Equal(t, ip, from) -} - -func TestNewInputDone(t *testing.T) { - config := mapstr.M{ - "host": ":0", - } - inputtest.AssertNotStartedInputCanBeDone(t, NewInput, &config) -} diff --git a/filebeat/input/udp/input.go b/filebeat/input/udp/input.go index 77e57820ea4..2dd75c47b8e 100644 --- a/filebeat/input/udp/input.go +++ b/filebeat/input/udp/input.go @@ -36,7 +36,7 @@ import ( stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless" "github.com/elastic/beats/v7/filebeat/inputsource" "github.com/elastic/beats/v7/filebeat/inputsource/udp" - "github.com/elastic/beats/v7/libbeat/beat" // TODO: Replace with sync/atomic when go1.19 is supported. + "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/feature" "github.com/elastic/beats/v7/libbeat/monitoring/inputmon" conf "github.com/elastic/elastic-agent-libs/config" @@ -129,7 +129,7 @@ func (s *server) Run(ctx input.Context, publisher stateless.Publisher) error { publisher.Publish(evt) - // This must be called after forwarder.Send to measure + // This must be called after publisher.Publish to measure // the processing time metric. metrics.log(data, evt.Timestamp) })