Skip to content

Commit

Permalink
[refactor] - s3 metrics (#3760)
Browse files Browse the repository at this point in the history
* use histogram

* use exponential
  • Loading branch information
ahrav authored Dec 11, 2024
1 parent c24b4d5 commit 3525c6f
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 22 deletions.
32 changes: 18 additions & 14 deletions pkg/sources/s3/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (
type metricsCollector interface {
// Object metrics.

RecordObjectScanned(bucket string)
RecordObjectSkipped(bucket, reason string)
RecordObjectScanned(bucket string, sizeBytes float64)
RecordObjectSkipped(bucket, reason string, sizeBytes float64)
RecordObjectError(bucket string)

// Role metrics.
Expand All @@ -22,8 +22,8 @@ type metricsCollector interface {
}

type collector struct {
objectsScanned *prometheus.CounterVec
objectsSkipped *prometheus.CounterVec
objectsScanned *prometheus.HistogramVec
objectsSkipped *prometheus.HistogramVec
objectsErrors *prometheus.CounterVec
rolesScanned *prometheus.GaugeVec
bucketsPerRole *prometheus.GaugeVec
Expand All @@ -33,18 +33,22 @@ var metricsInstance metricsCollector

func init() {
metricsInstance = &collector{
objectsScanned: promauto.NewCounterVec(prometheus.CounterOpts{
objectsScanned: promauto.NewHistogramVec(prometheus.HistogramOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "objects_scanned_total",
Help: "Total number of S3 objects successfully scanned",
Name: "objects_scanned_bytes",
Help: "Size distribution of successfully scanned S3 objects in bytes",
// 64B, 512B, 4KB, 32KB, 256KB, 2MB, 16MB, 128MB, 1GB.
Buckets: prometheus.ExponentialBuckets(64, 8, 9),
}, []string{"bucket"}),

objectsSkipped: promauto.NewCounterVec(prometheus.CounterOpts{
objectsSkipped: promauto.NewHistogramVec(prometheus.HistogramOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "objects_skipped_total",
Help: "Total number of S3 objects skipped during scan",
Name: "objects_skipped_bytes",
Help: "Size distribution of skipped S3 objects in bytes",
// 64B, 512B, 4KB, 32KB, 256KB, 2MB, 16MB, 128MB, 1GB.
Buckets: prometheus.ExponentialBuckets(64, 8, 9),
}, []string{"bucket", "reason"}),

objectsErrors: promauto.NewCounterVec(prometheus.CounterOpts{
Expand All @@ -70,12 +74,12 @@ func init() {
}
}

func (c *collector) RecordObjectScanned(bucket string) {
c.objectsScanned.WithLabelValues(bucket).Inc()
func (c *collector) RecordObjectScanned(bucket string, sizeBytes float64) {
c.objectsScanned.WithLabelValues(bucket).Observe(sizeBytes)
}

func (c *collector) RecordObjectSkipped(bucket, reason string) {
c.objectsSkipped.WithLabelValues(bucket, reason).Inc()
func (c *collector) RecordObjectSkipped(bucket, reason string, sizeBytes float64) {
c.objectsSkipped.WithLabelValues(bucket, reason).Observe(sizeBytes)
}

func (c *collector) RecordObjectError(bucket string) {
Expand Down
16 changes: 8 additions & 8 deletions pkg/sources/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ func (s *Source) pageChunker(

for objIdx, obj := range metadata.page.Contents {
if obj == nil {
s.metricsCollector.RecordObjectSkipped(metadata.bucket, "nil_object")
s.metricsCollector.RecordObjectSkipped(metadata.bucket, "nil_object", 0)
if err := s.checkpointer.UpdateObjectCompletion(ctx, objIdx, metadata.bucket, metadata.page.Contents); err != nil {
ctx.Logger().Error(err, "could not update progress for nil object")
}
Expand All @@ -448,7 +448,7 @@ func (s *Source) pageChunker(
// Skip GLACIER and GLACIER_IR objects.
if obj.StorageClass == nil || strings.Contains(*obj.StorageClass, "GLACIER") {
ctx.Logger().V(5).Info("Skipping object in storage class", "storage_class", *obj.StorageClass)
s.metricsCollector.RecordObjectSkipped(metadata.bucket, "storage_class")
s.metricsCollector.RecordObjectSkipped(metadata.bucket, "storage_class", float64(*obj.Size))
if err := s.checkpointer.UpdateObjectCompletion(ctx, objIdx, metadata.bucket, metadata.page.Contents); err != nil {
ctx.Logger().Error(err, "could not update progress for glacier object")
}
Expand All @@ -458,7 +458,7 @@ func (s *Source) pageChunker(
// Ignore large files.
if *obj.Size > s.maxObjectSize {
ctx.Logger().V(5).Info("Skipping %d byte file (over maxObjectSize limit)")
s.metricsCollector.RecordObjectSkipped(metadata.bucket, "size_limit")
s.metricsCollector.RecordObjectSkipped(metadata.bucket, "size_limit", float64(*obj.Size))
if err := s.checkpointer.UpdateObjectCompletion(ctx, objIdx, metadata.bucket, metadata.page.Contents); err != nil {
ctx.Logger().Error(err, "could not update progress for large file")
}
Expand All @@ -468,7 +468,7 @@ func (s *Source) pageChunker(
// File empty file.
if *obj.Size == 0 {
ctx.Logger().V(5).Info("Skipping empty file")
s.metricsCollector.RecordObjectSkipped(metadata.bucket, "empty_file")
s.metricsCollector.RecordObjectSkipped(metadata.bucket, "empty_file", 0)
if err := s.checkpointer.UpdateObjectCompletion(ctx, objIdx, metadata.bucket, metadata.page.Contents); err != nil {
ctx.Logger().Error(err, "could not update progress for empty file")
}
Expand All @@ -478,7 +478,7 @@ func (s *Source) pageChunker(
// Skip incompatible extensions.
if common.SkipFile(*obj.Key) {
ctx.Logger().V(5).Info("Skipping file with incompatible extension")
s.metricsCollector.RecordObjectSkipped(metadata.bucket, "incompatible_extension")
s.metricsCollector.RecordObjectSkipped(metadata.bucket, "incompatible_extension", float64(*obj.Size))
if err := s.checkpointer.UpdateObjectCompletion(ctx, objIdx, metadata.bucket, metadata.page.Contents); err != nil {
ctx.Logger().Error(err, "could not update progress for incompatible file")
}
Expand All @@ -493,7 +493,7 @@ func (s *Source) pageChunker(

if strings.HasSuffix(*obj.Key, "/") {
ctx.Logger().V(5).Info("Skipping directory")
s.metricsCollector.RecordObjectSkipped(metadata.bucket, "directory")
s.metricsCollector.RecordObjectSkipped(metadata.bucket, "directory", float64(*obj.Size))
return nil
}

Expand Down Expand Up @@ -521,7 +521,7 @@ func (s *Source) pageChunker(
if err != nil {
if strings.Contains(err.Error(), "AccessDenied") {
ctx.Logger().Error(err, "could not get S3 object; access denied")
s.metricsCollector.RecordObjectSkipped(metadata.bucket, "access_denied")
s.metricsCollector.RecordObjectSkipped(metadata.bucket, "access_denied", float64(*obj.Size))
} else {
ctx.Logger().Error(err, "could not get S3 object")
s.metricsCollector.RecordObjectError(metadata.bucket)
Expand Down Expand Up @@ -596,7 +596,7 @@ func (s *Source) pageChunker(
if err := s.checkpointer.UpdateObjectCompletion(ctx, objIdx, metadata.bucket, metadata.page.Contents); err != nil {
ctx.Logger().Error(err, "could not update progress for scanned object")
}
s.metricsCollector.RecordObjectScanned(metadata.bucket)
s.metricsCollector.RecordObjectScanned(metadata.bucket, float64(*obj.Size))

return nil
})
Expand Down

0 comments on commit 3525c6f

Please sign in to comment.