From 91c7d87f20f8efa8797803504af816baefd4a0bb Mon Sep 17 00:00:00 2001 From: Kavindu Dodanduwa Date: Mon, 25 Nov 2024 07:41:57 -0800 Subject: [PATCH] review change - rename and accessor Signed-off-by: Kavindu Dodanduwa --- x-pack/filebeat/input/awss3/metrics.go | 25 ++++++++++++----------- x-pack/filebeat/input/awss3/s3_objects.go | 2 +- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/x-pack/filebeat/input/awss3/metrics.go b/x-pack/filebeat/input/awss3/metrics.go index 74ac2d14a9a..0ebcaeb1a92 100644 --- a/x-pack/filebeat/input/awss3/metrics.go +++ b/x-pack/filebeat/input/awss3/metrics.go @@ -206,25 +206,26 @@ func newInputMetrics(id string, optionalParent *monitoring.Registry, maxWorkers return out } -// monitoredReader implements io.Reader and counts the number of bytes read. +// monitoredReader implements io.Reader and wraps byte read tracking fields for S3 bucket objects. +// Following are the tracked metrics, +// - totalBytesReadMetric - a total metric tracking bytes reads throughout the runtime from all processed objects +// - totalBytesReadCurrent - total bytes read from the currently tracked object +// +// See newMonitoredReader for initialization considerations. type monitoredReader struct { - reader io.Reader - totalBytesRead *monitoring.Uint - - trackBytes int64 + reader io.Reader + totalBytesReadMetric *monitoring.Uint + totalBytesReadCurrent int64 } +// newMonitoredReader initialize the monitoredReader with a shared monitor that tracks all bytes read. func newMonitoredReader(r io.Reader, metric *monitoring.Uint) *monitoredReader { - return &monitoredReader{reader: r, totalBytesRead: metric} + return &monitoredReader{reader: r, totalBytesReadMetric: metric} } func (m *monitoredReader) Read(p []byte) (int, error) { n, err := m.reader.Read(p) - m.totalBytesRead.Add(uint64(n)) - m.trackBytes += int64(n) + m.totalBytesReadMetric.Add(uint64(n)) + m.totalBytesReadCurrent += int64(n) return n, err } - -func (m *monitoredReader) GetTrackedBytes() int64 { - return m.trackBytes -} diff --git a/x-pack/filebeat/input/awss3/s3_objects.go b/x-pack/filebeat/input/awss3/s3_objects.go index 9168eabed7a..acd4d173439 100644 --- a/x-pack/filebeat/input/awss3/s3_objects.go +++ b/x-pack/filebeat/input/awss3/s3_objects.go @@ -213,7 +213,7 @@ func (p *s3ObjectProcessor) ProcessS3Object(log *logp.Logger, eventCallback func } // finally obtain total bytes of the object through metered reader - p.metrics.s3ObjectSizeInBytes.Update(mReader.GetTrackedBytes()) + p.metrics.s3ObjectSizeInBytes.Update(mReader.totalBytesReadCurrent) return nil }