Skip to content

Commit

Permalink
review change - rename and accessor
Browse files Browse the repository at this point in the history
Signed-off-by: Kavindu Dodanduwa <[email protected]>
  • Loading branch information
Kavindu-Dodan committed Nov 25, 2024
1 parent 5e496fd commit 91c7d87
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 13 deletions.
25 changes: 13 additions & 12 deletions x-pack/filebeat/input/awss3/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,25 +206,26 @@ func newInputMetrics(id string, optionalParent *monitoring.Registry, maxWorkers
return out
}

// monitoredReader implements io.Reader and counts the number of bytes read.
// monitoredReader implements io.Reader and wraps byte read tracking fields for S3 bucket objects.
// Following are the tracked metrics,
// - totalBytesReadMetric - a total metric tracking bytes reads throughout the runtime from all processed objects
// - totalBytesReadCurrent - total bytes read from the currently tracked object
//
// See newMonitoredReader for initialization considerations.
type monitoredReader struct {
reader io.Reader
totalBytesRead *monitoring.Uint

trackBytes int64
reader io.Reader
totalBytesReadMetric *monitoring.Uint
totalBytesReadCurrent int64
}

// newMonitoredReader initialize the monitoredReader with a shared monitor that tracks all bytes read.
func newMonitoredReader(r io.Reader, metric *monitoring.Uint) *monitoredReader {
return &monitoredReader{reader: r, totalBytesRead: metric}
return &monitoredReader{reader: r, totalBytesReadMetric: metric}
}

func (m *monitoredReader) Read(p []byte) (int, error) {
n, err := m.reader.Read(p)
m.totalBytesRead.Add(uint64(n))
m.trackBytes += int64(n)
m.totalBytesReadMetric.Add(uint64(n))
m.totalBytesReadCurrent += int64(n)
return n, err
}

func (m *monitoredReader) GetTrackedBytes() int64 {
return m.trackBytes
}
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/awss3/s3_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (p *s3ObjectProcessor) ProcessS3Object(log *logp.Logger, eventCallback func
}

// finally obtain total bytes of the object through metered reader
p.metrics.s3ObjectSizeInBytes.Update(mReader.GetTrackedBytes())
p.metrics.s3ObjectSizeInBytes.Update(mReader.totalBytesReadCurrent)

return nil
}
Expand Down

0 comments on commit 91c7d87

Please sign in to comment.