Skip to content

Commit

Permalink
[feat] - buffered file writer metrics (#2395)
Browse files Browse the repository at this point in the history
* use diff chan

* correctly use the buffered file writer

* use value from source

* reorder fields

* add tests and update

* Fix issue with buffer slices growing

* fix test

* correctly use the buffered file writer

* use value from source

* reorder fields

* fix

* add singleton

* use shared pool

* optimize

* rename and cleanup

* add metrics

* add print

* rebase

* remove extra inc

* add metrics for checkout time

* add comment

* use microseconds

* add metrics

* add metrics pkg

* add more metrics

* rever test

* remove fields

* fix

* resize and return

* update metric name

* remove comment

* address comments

* add comment
  • Loading branch information
ahrav committed Feb 11, 2024
1 parent 658857a commit 5702d2e
Show file tree
Hide file tree
Showing 2 changed files with 212 additions and 16 deletions.
111 changes: 95 additions & 16 deletions pkg/writers/buffered_file_writer/bufferedfilewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,51 @@ import (
"io"
"os"
"sync"
"time"

"github.com/trufflesecurity/trufflehog/v3/pkg/cleantemp"
"github.com/trufflesecurity/trufflehog/v3/pkg/context"
)

type bufferPoolMetrics struct{}

func (bufferPoolMetrics) recordGrowth(growthAmount int) {
growCount.Inc()
growAmount.Add(float64(growthAmount))
}

func (bufferPoolMetrics) recordShrink(amount int) {
shrinkCount.Inc()
shrinkAmount.Add(float64(amount))
}

func (bufferPoolMetrics) recordCheckoutDuration(duration time.Duration) {
checkoutDuration.Observe(float64(duration.Microseconds()))
checkoutCount.Inc()
checkoutDurationTotal.Add(float64(duration.Microseconds()))
}

func (bufferPoolMetrics) recordBufferRetrival() {
activeBufferCount.Inc()
bufferCount.Inc()
}

func (bufferPoolMetrics) recordBufferReturn(bufCap, bufLen int64) {
activeBufferCount.Dec()
totalBufferSize.Add(float64(bufCap))
totalBufferLength.Add(float64(bufLen))
}

type bufPoolOpt func(pool *bufferPool)

type bufferPool struct {
bufferSize uint32
*sync.Pool

metrics bufferPoolMetrics
}

const defaultBufferSize = 2 << 10 // 2KB
const defaultBufferSize = 1 << 12 // 4KB
func newBufferPool(opts ...bufPoolOpt) *bufferPool {
pool := &bufferPool{bufferSize: defaultBufferSize}

Expand All @@ -29,8 +61,7 @@ func newBufferPool(opts ...bufPoolOpt) *bufferPool {
}
pool.Pool = &sync.Pool{
New: func() any {
buf := new(bytes.Buffer)
buf.Grow(int(pool.bufferSize))
buf := &buffer{Buffer: bytes.NewBuffer(make([]byte, 0, pool.bufferSize))}
return buf
},
}
Expand All @@ -44,22 +75,40 @@ var sharedBufferPool *bufferPool

func init() { sharedBufferPool = newBufferPool() }

func (bp *bufferPool) get(ctx context.Context) *bytes.Buffer {
buf, ok := bp.Pool.Get().(*bytes.Buffer)
// buffer is a wrapper around bytes.Buffer that includes a timestamp for tracking buffer checkout duration.
type buffer struct {
*bytes.Buffer
checkedOut time.Time
}

func (bp *bufferPool) get(ctx context.Context) *buffer {
buf, ok := bp.Pool.Get().(*buffer)
if !ok {
ctx.Logger().Error(fmt.Errorf("buffer pool returned unexpected type"), "using new buffer")
buf = bytes.NewBuffer(make([]byte, 0, bp.bufferSize))
buf = &buffer{Buffer: bytes.NewBuffer(make([]byte, 0, bp.bufferSize))}
}
buf.checkedOut = time.Now()
bp.metrics.recordBufferRetrival()

return buf
}

func (bp *bufferPool) put(buf *bytes.Buffer) {
// If the buffer is more than twice the default size, release it for garbage collection.
func (bp *bufferPool) growBufferWithSize(buf *buffer, size int) {
// Grow the buffer to accommodate the new data.
bp.metrics.recordGrowth(size)
buf.Grow(size)
}

func (bp *bufferPool) put(buf *buffer) {
bp.metrics.recordBufferReturn(int64(buf.Cap()), int64(buf.Len()))
bp.metrics.recordCheckoutDuration(time.Since(buf.checkedOut))

// If the buffer is more than twice the default size, replace it with a new buffer.
// This prevents us from returning very large buffers to the pool.
const maxAllowedCapacity = 2 * defaultBufferSize
if buf.Cap() > maxAllowedCapacity {
buf = nil // Release the large buffer for garbage collection.
bp.metrics.recordShrink(buf.Cap() - defaultBufferSize)
buf = &buffer{Buffer: bytes.NewBuffer(make([]byte, 0, bp.bufferSize))}
} else {
// Reset the buffer to clear any existing data.
buf.Reset()
Expand All @@ -68,6 +117,23 @@ func (bp *bufferPool) put(buf *bytes.Buffer) {
bp.Put(buf)
}

type bufferedFileWriterMetrics struct{}

func (bufferedFileWriterMetrics) recordDataProcessed(size uint64, dur time.Duration) {
totalWriteSize.Add(float64(size))
totalWriteDuration.Add(float64(dur.Microseconds()))
}

func (bufferedFileWriterMetrics) recordDiskWrite(ctx context.Context, f *os.File) {
diskWriteCount.Inc()
size, err := f.Stat()
if err != nil {
ctx.Logger().Error(err, "failed to get file size for metric")
return
}
fileSizeHistogram.Observe(float64(size.Size()))
}

// state represents the current mode of BufferedFileWriter.
type state uint8

Expand All @@ -84,12 +150,14 @@ type BufferedFileWriter struct {
threshold uint64 // Threshold for switching to file writing.
size uint64 // Total size of the data written.

state state // Current state of the writer. (writeOnly or readOnly)

bufPool *bufferPool // Pool for storing buffers for reuse.
buf *bytes.Buffer // Buffer for storing data under the threshold in memory.
buf *buffer // Buffer for storing data under the threshold in memory.
filename string // Name of the temporary file.
file io.WriteCloser // File for storing data over the threshold.

state state // Current state of the writer. (writeOnly or readOnly)

metrics bufferedFileWriterMetrics
}

// Option is a function that modifies a BufferedFileWriter.
Expand All @@ -100,9 +168,9 @@ func WithThreshold(threshold uint64) Option {
return func(w *BufferedFileWriter) { w.threshold = threshold }
}

const defaultThreshold = 10 * 1024 * 1024 // 10MB
// New creates a new BufferedFileWriter with the given options.
func New(opts ...Option) *BufferedFileWriter {
const defaultThreshold = 10 * 1024 * 1024 // 10MB
w := &BufferedFileWriter{
threshold: defaultThreshold,
state: writeOnly,
Expand All @@ -111,6 +179,7 @@ func New(opts ...Option) *BufferedFileWriter {
for _, opt := range opts {
opt(w)
}

return w
}

Expand Down Expand Up @@ -158,17 +227,20 @@ func (w *BufferedFileWriter) Write(ctx context.Context, data []byte) (int, error

bufferLength := w.buf.Len()

defer func() {
start := time.Now()
defer func(start time.Time) {
w.metrics.recordDataProcessed(size, time.Since(start))

w.size += size
ctx.Logger().V(4).Info(
"write complete",
"data_size", size,
"content_size", bufferLength,
"total_size", w.size,
)
}()
}(start)

totalSizeNeeded := uint64(bufferLength) + uint64(len(data))
totalSizeNeeded := uint64(bufferLength) + size
if totalSizeNeeded <= w.threshold {
// If the total size is within the threshold, write to the buffer.
ctx.Logger().V(4).Info(
Expand All @@ -187,6 +259,12 @@ func (w *BufferedFileWriter) Write(ctx context.Context, data []byte) (int, error
"available_space", availableSpace,
"grow_size", growSize,
)
// We are manually growing the buffer so we can track the growth via metrics.
// Knowing the exact data size, we directly resize to fit it, rather than exponential growth
// which may require multiple allocations and copies if the size required is much larger
// than double the capacity. Our approach aligns with default behavior when growth sizes
// happen to match current capacity, retaining asymptotic efficiency benefits.
w.bufPool.growBufferWithSize(w.buf, growSize)
}

return w.buf.Write(data)
Expand All @@ -202,6 +280,7 @@ func (w *BufferedFileWriter) Write(ctx context.Context, data []byte) (int, error

w.filename = file.Name()
w.file = file
w.metrics.recordDiskWrite(ctx, file)

// Transfer existing data in buffer to the file, then clear the buffer.
// This ensures all the data is in one place - either entirely in the buffer or the file.
Expand Down
117 changes: 117 additions & 0 deletions pkg/writers/buffered_file_writer/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package bufferedfilewriter

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/trufflesecurity/trufflehog/v3/pkg/common"
)

var (
activeBufferCount = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "active_buffer_count",
Help: "Current number of active buffers.",
})

bufferCount = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "buffer_count",
Help: "Total number of buffers managed by the pool.",
})

totalBufferLength = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "total_buffer_length",
Help: "Total length of all buffers combined.",
})

totalBufferSize = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "total_buffer_size",
Help: "Total size of all buffers combined.",
})

growCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "grow_count",
Help: "Total number of times buffers in the pool have grown.",
})

growAmount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "grow_amount",
Help: "Total amount of bytes buffers in the pool have grown by.",
})

shrinkCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "shrink_count",
Help: "Total number of times buffers in the pool have shrunk.",
})

shrinkAmount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "shrink_amount",
Help: "Total amount of bytes buffers in the pool have shrunk by.",
})

checkoutDurationTotal = promauto.NewCounter(prometheus.CounterOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "checkout_duration_total_us",
Help: "Total duration in microseconds of buffer checkouts.",
})

checkoutDuration = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "checkout_duration_us",
Help: "Duration in microseconds of buffer checkouts.",
Buckets: []float64{50, 500, 5000},
})

checkoutCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "checkout_count",
Help: "Total number of buffer checkouts.",
})

totalWriteSize = promauto.NewCounter(prometheus.CounterOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "total_write_size_bytes",
Help: "Total size of data written by the BufferedFileWriter in bytes.",
})

totalWriteDuration = promauto.NewCounter(prometheus.CounterOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "total_write_duration_microseconds",
Help: "Total duration of write operations by the BufferedFileWriter in microseconds.",
})

diskWriteCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "disk_write_count",
Help: "Total number of times data was written to disk by the BufferedFileWriter.",
})

fileSizeHistogram = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "file_size_bytes",
Help: "Sizes of files created by the BufferedFileWriter.",
Buckets: prometheus.ExponentialBuckets(defaultThreshold, 2, 4),
})
)

0 comments on commit 5702d2e

Please sign in to comment.