Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ahrav committed Feb 7, 2024
1 parent 72dfce7 commit d27c15a
Showing 1 changed file with 13 additions and 14 deletions.
27 changes: 13 additions & 14 deletions pkg/writers/buffered_file_writer/bufferedfilewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,28 @@ import (

type bufferPoolMetrics struct{}

func (b *bufferPoolMetrics) recordGrowth(growthAmount int) {
func (bufferPoolMetrics) recordGrowth(growthAmount int) {
growCount.Inc()
growAmount.Add(float64(growthAmount))
}

func (b *bufferPoolMetrics) recordShrink(amount int) {
func (bufferPoolMetrics) recordShrink(amount int) {
shrinkCount.Inc()
shrinkAmount.Add(float64(amount))
}

func (b *bufferPoolMetrics) recordCheckoutDuration(duration time.Duration) {
func (bufferPoolMetrics) recordCheckoutDuration(duration time.Duration) {
checkoutDuration.Observe(float64(duration.Microseconds()))
checkoutCount.Inc()
checkoutDurationTotal.Add(float64(duration.Microseconds()))
}

func (b *bufferPoolMetrics) recordBufferRetrival() {
func (bufferPoolMetrics) recordBufferRetrival() {
activeBufferCount.Inc()
bufferCount.Inc()
}

func (b *bufferPoolMetrics) recordBufferReturn(bufCap, bufLen int64) {
func (bufferPoolMetrics) recordBufferReturn(bufCap, bufLen int64) {
activeBufferCount.Dec()
totalBufferSize.Add(float64(bufCap))
totalBufferLength.Add(float64(bufLen))
Expand All @@ -49,12 +49,12 @@ type bufferPool struct {
bufferSize uint32
*sync.Pool

metrics *bufferPoolMetrics
metrics bufferPoolMetrics
}

const defaultBufferSize = 2 << 12 // 4KB
const defaultBufferSize = 1 << 12 // 4KB
func newBufferPool(opts ...bufPoolOpt) *bufferPool {
pool := &bufferPool{bufferSize: defaultBufferSize, metrics: new(bufferPoolMetrics)}
pool := &bufferPool{bufferSize: defaultBufferSize}

for _, opt := range opts {
opt(pool)
Expand Down Expand Up @@ -119,12 +119,12 @@ func (bp *bufferPool) put(buf *buffer) {

type bufferedFileWriterMetrics struct{}

func (*bufferedFileWriterMetrics) recordDataProcessed(size uint64, dur time.Duration) {
func (bufferedFileWriterMetrics) recordDataProcessed(size uint64, dur time.Duration) {
totalWriteSize.Add(float64(size))
totalWriteDuration.Add(float64(dur.Microseconds()))
}

func (*bufferedFileWriterMetrics) recordDiskWrite(ctx context.Context, f *os.File) {
func (bufferedFileWriterMetrics) recordDiskWrite(ctx context.Context, f *os.File) {
diskWriteCount.Inc()
size, err := f.Stat()
if err != nil {
Expand All @@ -150,14 +150,14 @@ type BufferedFileWriter struct {
threshold uint64 // Threshold for switching to file writing.
size uint64 // Total size of the data written.

state state // Current state of the writer. (writeOnly or readOnly)

bufPool *bufferPool // Pool for storing buffers for reuse.
buf *buffer // Buffer for storing data under the threshold in memory.
filename string // Name of the temporary file.
file io.WriteCloser // File for storing data over the threshold.

metrics *bufferedFileWriterMetrics
state state // Current state of the writer. (writeOnly or readOnly)

metrics bufferedFileWriterMetrics
}

// Option is a function that modifies a BufferedFileWriter.
Expand All @@ -175,7 +175,6 @@ func New(opts ...Option) *BufferedFileWriter {
threshold: defaultThreshold,
state: writeOnly,
bufPool: sharedBufferPool,
metrics: new(bufferedFileWriterMetrics),
}
for _, opt := range opts {
opt(w)
Expand Down

0 comments on commit d27c15a

Please sign in to comment.