From 6557b3b3214747ee59a825cb3df2a91281b82a41 Mon Sep 17 00:00:00 2001 From: ahrav Date: Thu, 8 Feb 2024 07:38:40 -0800 Subject: [PATCH] [feat] - buffered file writer metrics (#2395) * use diff chan * correctly use the buffered file writer * use value from source * reorder fields * add tests and update * Fix issue with buffer slices growing * fix test * correctly use the buffered file writer * use value from source * reorder fields * fix * add singleton * use shared pool * optimize * rename and cleanup * add metrics * add print * rebase * remove extra inc * add metrics for checkout time * add comment * use microseconds * add metrics * add metrics pkg * add more metrics * rever test * remove fields * fix * resize and return * update metric name * remove comment * address comments * add comment --- .../bufferedfilewriter.go | 111 ++++++++++++++--- pkg/writers/buffered_file_writer/metrics.go | 117 ++++++++++++++++++ 2 files changed, 212 insertions(+), 16 deletions(-) create mode 100644 pkg/writers/buffered_file_writer/metrics.go diff --git a/pkg/writers/buffered_file_writer/bufferedfilewriter.go b/pkg/writers/buffered_file_writer/bufferedfilewriter.go index 09a3c0396c54..088cc3ff8bd7 100644 --- a/pkg/writers/buffered_file_writer/bufferedfilewriter.go +++ b/pkg/writers/buffered_file_writer/bufferedfilewriter.go @@ -8,19 +8,51 @@ import ( "io" "os" "sync" + "time" "github.com/trufflesecurity/trufflehog/v3/pkg/cleantemp" "github.com/trufflesecurity/trufflehog/v3/pkg/context" ) +type bufferPoolMetrics struct{} + +func (bufferPoolMetrics) recordGrowth(growthAmount int) { + growCount.Inc() + growAmount.Add(float64(growthAmount)) +} + +func (bufferPoolMetrics) recordShrink(amount int) { + shrinkCount.Inc() + shrinkAmount.Add(float64(amount)) +} + +func (bufferPoolMetrics) recordCheckoutDuration(duration time.Duration) { + checkoutDuration.Observe(float64(duration.Microseconds())) + checkoutCount.Inc() + checkoutDurationTotal.Add(float64(duration.Microseconds())) +} + +func (bufferPoolMetrics) recordBufferRetrival() { + activeBufferCount.Inc() + bufferCount.Inc() +} + +func (bufferPoolMetrics) recordBufferReturn(bufCap, bufLen int64) { + activeBufferCount.Dec() + totalBufferSize.Add(float64(bufCap)) + totalBufferLength.Add(float64(bufLen)) +} + type bufPoolOpt func(pool *bufferPool) type bufferPool struct { bufferSize uint32 *sync.Pool + + metrics bufferPoolMetrics } -const defaultBufferSize = 2 << 10 // 2KB +const defaultBufferSize = 1 << 12 // 4KB func newBufferPool(opts ...bufPoolOpt) *bufferPool { pool := &bufferPool{bufferSize: defaultBufferSize} @@ -29,8 +61,7 @@ func newBufferPool(opts ...bufPoolOpt) *bufferPool { } pool.Pool = &sync.Pool{ New: func() any { - buf := new(bytes.Buffer) - buf.Grow(int(pool.bufferSize)) + buf := &buffer{Buffer: bytes.NewBuffer(make([]byte, 0, pool.bufferSize))} return buf }, } @@ -44,22 +75,40 @@ var sharedBufferPool *bufferPool func init() { sharedBufferPool = newBufferPool() } -func (bp *bufferPool) get(ctx context.Context) *bytes.Buffer { - buf, ok := bp.Pool.Get().(*bytes.Buffer) +// buffer is a wrapper around bytes.Buffer that includes a timestamp for tracking buffer checkout duration. +type buffer struct { + *bytes.Buffer + checkedOut time.Time +} + +func (bp *bufferPool) get(ctx context.Context) *buffer { + buf, ok := bp.Pool.Get().(*buffer) if !ok { ctx.Logger().Error(fmt.Errorf("buffer pool returned unexpected type"), "using new buffer") - buf = bytes.NewBuffer(make([]byte, 0, bp.bufferSize)) + buf = &buffer{Buffer: bytes.NewBuffer(make([]byte, 0, bp.bufferSize))} } + buf.checkedOut = time.Now() + bp.metrics.recordBufferRetrival() return buf } -func (bp *bufferPool) put(buf *bytes.Buffer) { - // If the buffer is more than twice the default size, release it for garbage collection. +func (bp *bufferPool) growBufferWithSize(buf *buffer, size int) { + // Grow the buffer to accommodate the new data. + bp.metrics.recordGrowth(size) + buf.Grow(size) +} + +func (bp *bufferPool) put(buf *buffer) { + bp.metrics.recordBufferReturn(int64(buf.Cap()), int64(buf.Len())) + bp.metrics.recordCheckoutDuration(time.Since(buf.checkedOut)) + + // If the buffer is more than twice the default size, replace it with a new buffer. // This prevents us from returning very large buffers to the pool. const maxAllowedCapacity = 2 * defaultBufferSize if buf.Cap() > maxAllowedCapacity { - buf = nil // Release the large buffer for garbage collection. + bp.metrics.recordShrink(buf.Cap() - defaultBufferSize) + buf = &buffer{Buffer: bytes.NewBuffer(make([]byte, 0, bp.bufferSize))} } else { // Reset the buffer to clear any existing data. buf.Reset() @@ -68,6 +117,23 @@ func (bp *bufferPool) put(buf *bytes.Buffer) { bp.Put(buf) } +type bufferedFileWriterMetrics struct{} + +func (bufferedFileWriterMetrics) recordDataProcessed(size uint64, dur time.Duration) { + totalWriteSize.Add(float64(size)) + totalWriteDuration.Add(float64(dur.Microseconds())) +} + +func (bufferedFileWriterMetrics) recordDiskWrite(ctx context.Context, f *os.File) { + diskWriteCount.Inc() + size, err := f.Stat() + if err != nil { + ctx.Logger().Error(err, "failed to get file size for metric") + return + } + fileSizeHistogram.Observe(float64(size.Size())) +} + // state represents the current mode of BufferedFileWriter. type state uint8 @@ -84,12 +150,14 @@ type BufferedFileWriter struct { threshold uint64 // Threshold for switching to file writing. size uint64 // Total size of the data written. - state state // Current state of the writer. (writeOnly or readOnly) - bufPool *bufferPool // Pool for storing buffers for reuse. - buf *bytes.Buffer // Buffer for storing data under the threshold in memory. + buf *buffer // Buffer for storing data under the threshold in memory. filename string // Name of the temporary file. file io.WriteCloser // File for storing data over the threshold. + + state state // Current state of the writer. (writeOnly or readOnly) + + metrics bufferedFileWriterMetrics } // Option is a function that modifies a BufferedFileWriter. @@ -100,9 +168,9 @@ func WithThreshold(threshold uint64) Option { return func(w *BufferedFileWriter) { w.threshold = threshold } } +const defaultThreshold = 10 * 1024 * 1024 // 10MB // New creates a new BufferedFileWriter with the given options. func New(opts ...Option) *BufferedFileWriter { - const defaultThreshold = 10 * 1024 * 1024 // 10MB w := &BufferedFileWriter{ threshold: defaultThreshold, state: writeOnly, @@ -111,6 +179,7 @@ func New(opts ...Option) *BufferedFileWriter { for _, opt := range opts { opt(w) } + return w } @@ -158,7 +227,10 @@ func (w *BufferedFileWriter) Write(ctx context.Context, data []byte) (int, error bufferLength := w.buf.Len() - defer func() { + start := time.Now() + defer func(start time.Time) { + w.metrics.recordDataProcessed(size, time.Since(start)) + w.size += size ctx.Logger().V(4).Info( "write complete", @@ -166,9 +238,9 @@ func (w *BufferedFileWriter) Write(ctx context.Context, data []byte) (int, error "content_size", bufferLength, "total_size", w.size, ) - }() + }(start) - totalSizeNeeded := uint64(bufferLength) + uint64(len(data)) + totalSizeNeeded := uint64(bufferLength) + size if totalSizeNeeded <= w.threshold { // If the total size is within the threshold, write to the buffer. ctx.Logger().V(4).Info( @@ -187,6 +259,12 @@ func (w *BufferedFileWriter) Write(ctx context.Context, data []byte) (int, error "available_space", availableSpace, "grow_size", growSize, ) + // We are manually growing the buffer so we can track the growth via metrics. + // Knowing the exact data size, we directly resize to fit it, rather than exponential growth + // which may require multiple allocations and copies if the size required is much larger + // than double the capacity. Our approach aligns with default behavior when growth sizes + // happen to match current capacity, retaining asymptotic efficiency benefits. + w.bufPool.growBufferWithSize(w.buf, growSize) } return w.buf.Write(data) @@ -202,6 +280,7 @@ func (w *BufferedFileWriter) Write(ctx context.Context, data []byte) (int, error w.filename = file.Name() w.file = file + w.metrics.recordDiskWrite(ctx, file) // Transfer existing data in buffer to the file, then clear the buffer. // This ensures all the data is in one place - either entirely in the buffer or the file. diff --git a/pkg/writers/buffered_file_writer/metrics.go b/pkg/writers/buffered_file_writer/metrics.go new file mode 100644 index 000000000000..a940090e5def --- /dev/null +++ b/pkg/writers/buffered_file_writer/metrics.go @@ -0,0 +1,117 @@ +package bufferedfilewriter + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + + "github.com/trufflesecurity/trufflehog/v3/pkg/common" +) + +var ( + activeBufferCount = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: common.MetricsNamespace, + Subsystem: common.MetricsSubsystem, + Name: "active_buffer_count", + Help: "Current number of active buffers.", + }) + + bufferCount = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: common.MetricsNamespace, + Subsystem: common.MetricsSubsystem, + Name: "buffer_count", + Help: "Total number of buffers managed by the pool.", + }) + + totalBufferLength = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: common.MetricsNamespace, + Subsystem: common.MetricsSubsystem, + Name: "total_buffer_length", + Help: "Total length of all buffers combined.", + }) + + totalBufferSize = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: common.MetricsNamespace, + Subsystem: common.MetricsSubsystem, + Name: "total_buffer_size", + Help: "Total size of all buffers combined.", + }) + + growCount = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: common.MetricsNamespace, + Subsystem: common.MetricsSubsystem, + Name: "grow_count", + Help: "Total number of times buffers in the pool have grown.", + }) + + growAmount = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: common.MetricsNamespace, + Subsystem: common.MetricsSubsystem, + Name: "grow_amount", + Help: "Total amount of bytes buffers in the pool have grown by.", + }) + + shrinkCount = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: common.MetricsNamespace, + Subsystem: common.MetricsSubsystem, + Name: "shrink_count", + Help: "Total number of times buffers in the pool have shrunk.", + }) + + shrinkAmount = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: common.MetricsNamespace, + Subsystem: common.MetricsSubsystem, + Name: "shrink_amount", + Help: "Total amount of bytes buffers in the pool have shrunk by.", + }) + + checkoutDurationTotal = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: common.MetricsNamespace, + Subsystem: common.MetricsSubsystem, + Name: "checkout_duration_total_us", + Help: "Total duration in microseconds of buffer checkouts.", + }) + + checkoutDuration = promauto.NewHistogram(prometheus.HistogramOpts{ + Namespace: common.MetricsNamespace, + Subsystem: common.MetricsSubsystem, + Name: "checkout_duration_us", + Help: "Duration in microseconds of buffer checkouts.", + Buckets: []float64{50, 500, 5000}, + }) + + checkoutCount = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: common.MetricsNamespace, + Subsystem: common.MetricsSubsystem, + Name: "checkout_count", + Help: "Total number of buffer checkouts.", + }) + + totalWriteSize = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: common.MetricsNamespace, + Subsystem: common.MetricsSubsystem, + Name: "total_write_size_bytes", + Help: "Total size of data written by the BufferedFileWriter in bytes.", + }) + + totalWriteDuration = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: common.MetricsNamespace, + Subsystem: common.MetricsSubsystem, + Name: "total_write_duration_microseconds", + Help: "Total duration of write operations by the BufferedFileWriter in microseconds.", + }) + + diskWriteCount = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: common.MetricsNamespace, + Subsystem: common.MetricsSubsystem, + Name: "disk_write_count", + Help: "Total number of times data was written to disk by the BufferedFileWriter.", + }) + + fileSizeHistogram = promauto.NewHistogram(prometheus.HistogramOpts{ + Namespace: common.MetricsNamespace, + Subsystem: common.MetricsSubsystem, + Name: "file_size_bytes", + Help: "Sizes of files created by the BufferedFileWriter.", + Buckets: prometheus.ExponentialBuckets(defaultThreshold, 2, 4), + }) +)