From 9966194730f00cf21323df3d342e3773d2273399 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 9 May 2018 16:46:52 -0700 Subject: [PATCH 1/4] naming of constants --- client/driver/logging/rotator.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/client/driver/logging/rotator.go b/client/driver/logging/rotator.go index bd3ed86a1fd..8533ba0014e 100644 --- a/client/driver/logging/rotator.go +++ b/client/driver/logging/rotator.go @@ -15,8 +15,11 @@ import ( ) const ( - bufSize = 32768 - flushDur = 100 * time.Millisecond + // logBufferSize is the size of the buffer. + logBufferSize = 32768 + + // bufferFlushDuration is the duration at which we flush the buffer. + bufferFlushDuration = 100 * time.Millisecond ) // FileRotator writes bytes to a rotated set of files @@ -53,7 +56,7 @@ func NewFileRotator(path string, baseFile string, maxFiles int, path: path, baseFileName: baseFile, - flushTicker: time.NewTicker(flushDur), + flushTicker: time.NewTicker(bufferFlushDuration), logger: logger, purgeCh: make(chan struct{}, 1), doneCh: make(chan struct{}, 1), @@ -283,7 +286,7 @@ func (f *FileRotator) createOrResetBuffer() { f.bufLock.Lock() defer f.bufLock.Unlock() if f.bufw == nil { - f.bufw = bufio.NewWriterSize(f.currentFile, bufSize) + f.bufw = bufio.NewWriterSize(f.currentFile, logBufferSize) } else { f.bufw.Reset(f.currentFile) } From 99182736e8bc4cd4b3cf9d59525baf59f79addfa Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 9 May 2018 17:06:52 -0700 Subject: [PATCH 2/4] Benchmark for rotator BenchmarkRotator/1KB-8 200000 5572 ns/op BenchmarkRotator/2KB-8 200000 8338 ns/op BenchmarkRotator/4KB-8 100000 14246 ns/op BenchmarkRotator/8KB-8 50000 25279 ns/op BenchmarkRotator/16KB-8 30000 48602 ns/op BenchmarkRotator/32KB-8 20000 92159 ns/op BenchmarkRotator/64KB-8 10000 154766 ns/op BenchmarkRotator/128KB-8 5000 296872 ns/op BenchmarkRotator/256KB-8 3000 551793 ns/op --- client/driver/logging/rotator.go | 2 +- client/driver/logging/rotator_test.go | 46 +++++++++++++++++++++++++++ 2 files changed, 47 insertions(+), 1 deletion(-) diff --git a/client/driver/logging/rotator.go b/client/driver/logging/rotator.go index 8533ba0014e..056c936a32e 100644 --- a/client/driver/logging/rotator.go +++ b/client/driver/logging/rotator.go @@ -16,7 +16,7 @@ import ( const ( // logBufferSize is the size of the buffer. - logBufferSize = 32768 + logBufferSize = 32 * 1024 // bufferFlushDuration is the duration at which we flush the buffer. bufferFlushDuration = 100 * time.Millisecond diff --git a/client/driver/logging/rotator_test.go b/client/driver/logging/rotator_test.go index 68b200e872e..052b07fc9e3 100644 --- a/client/driver/logging/rotator_test.go +++ b/client/driver/logging/rotator_test.go @@ -4,6 +4,7 @@ import ( "fmt" "io/ioutil" "log" + "math/rand" "os" "path/filepath" "testing" @@ -289,3 +290,48 @@ func TestFileRotator_PurgeOldFiles(t *testing.T) { t.Fatalf("%v", lastErr) }) } + +func BenchmarkRotator(b *testing.B) { + kb := 1024 + for _, inputSize := range []int{kb, 2 * kb, 4 * kb, 8 * kb, 16 * kb, 32 * kb, 64 * kb, 128 * kb, 256 * kb} { + b.Run(fmt.Sprintf("%dKB", inputSize/kb), func(b *testing.B) { + benchmarkRotatorWithInputSize(inputSize, b) + }) + } +} + +func benchmarkRotatorWithInputSize(size int, b *testing.B) { + var path string + var err error + if path, err = ioutil.TempDir("", pathPrefix); err != nil { + b.Fatalf("test setup err: %v", err) + } + defer os.RemoveAll(path) + + fr, err := NewFileRotator(path, baseFileName, 5, 1024*1024, logger) + if err != nil { + b.Fatalf("test setup err: %v", err) + } + b.ResetTimer() + + // run the Fib function b.N times + for n := 0; n < b.N; n++ { + // Generate some input + data := make([]byte, size) + _, err := rand.Read(data) + if err != nil { + b.Fatalf("Error generating date: %v", err) + } + + // Insert random new lines + for i := 0; i < 100; i++ { + index := rand.Intn(size) + data[index] = '\n' + } + + // Write the data + if _, err := fr.Write(data); err != nil { + b.Fatalf("Failed to write data: %v", err) + } + } +} From 8aaf0746999b4c569c0ae61c1fbf4850c1b62a75 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Thu, 10 May 2018 14:38:45 -0700 Subject: [PATCH 3/4] Avoid splitting log line across two files We attempt to avoid splitting a log line between two files by detecting if we are near the file size limit and scanning for new lines and only flushing those. BenchmarkRotator/1KB-8 300000 5613 ns/op BenchmarkRotator/2KB-8 200000 8384 ns/op BenchmarkRotator/4KB-8 100000 14604 ns/op BenchmarkRotator/8KB-8 50000 25002 ns/op BenchmarkRotator/16KB-8 30000 47572 ns/op BenchmarkRotator/32KB-8 20000 92080 ns/op BenchmarkRotator/64KB-8 10000 165883 ns/op BenchmarkRotator/128KB-8 5000 294405 ns/op BenchmarkRotator/256KB-8 2000 572374 ns/op --- client/driver/logging/rotator.go | 56 ++++++++++++++++++----- client/driver/logging/rotator_test.go | 66 +++++++++++++++++++++++++++ 2 files changed, 111 insertions(+), 11 deletions(-) diff --git a/client/driver/logging/rotator.go b/client/driver/logging/rotator.go index 056c936a32e..152ab55086d 100644 --- a/client/driver/logging/rotator.go +++ b/client/driver/logging/rotator.go @@ -2,6 +2,7 @@ package logging import ( "bufio" + "bytes" "fmt" "io/ioutil" "log" @@ -20,6 +21,15 @@ const ( // bufferFlushDuration is the duration at which we flush the buffer. bufferFlushDuration = 100 * time.Millisecond + + // lineScanLimit is the number of bytes we will attempt to scan for new + // lines when approaching the end of the file to avoid a log line being + // split between two files. Any single line that is greater than this limit + // may be split. + lineScanLimit = 16 * 1024 + + // newLineDelimiter is the delimiter used for new lines. + newLineDelimiter = '\n' ) // FileRotator writes bytes to a rotated set of files @@ -73,12 +83,13 @@ func NewFileRotator(path string, baseFile string, maxFiles int, // equal to the maximum size the user has defined. func (f *FileRotator) Write(p []byte) (n int, err error) { n = 0 - var nw int + var forceRotate bool for n < len(p) { // Check if we still have space in the current file, otherwise close and // open the next file - if f.currentWr >= f.FileSize { + if forceRotate || f.currentWr >= f.FileSize { + forceRotate = false f.flushBuffer() f.currentFile.Close() if err := f.nextFile(); err != nil { @@ -86,15 +97,38 @@ func (f *FileRotator) Write(p []byte) (n int, err error) { return 0, err } } - // Calculate the remaining size on this file - remainingSize := f.FileSize - f.currentWr - - // Check if the number of bytes that we have to write is less than the - // remaining size of the file - if remainingSize < int64(len(p[n:])) { - // Write the number of bytes that we can write on the current file - li := int64(n) + remainingSize - nw, err = f.writeToBuffer(p[n:li]) + // Calculate the remaining size on this file and how much we have left + // to write + remainingSpace := f.FileSize - f.currentWr + remainingToWrite := int64(len(p[n:])) + + // Check if we are near the end of the file. If we are we attempt to + // avoid a log line being split between two files. + var nw int + if (remainingSpace - lineScanLimit) < remainingToWrite { + // Scan for new line and if the data up to new line fits in current + // file, write to buffer + idx := bytes.IndexByte(p[n:], newLineDelimiter) + if idx >= 0 && (remainingSpace-int64(idx)-1) >= 0 { + // We have space so write it to buffer + nw, err = f.writeToBuffer(p[n : n+idx+1]) + } else if idx >= 0 { + // We found a new line but don't have space so just force rotate + forceRotate = true + } else if remainingToWrite > f.FileSize || f.FileSize-lineScanLimit < 0 { + // There is no new line remaining but there is no point in + // rotating since the remaining data will not even fit in the + // next file either so just fill this one up. + li := int64(n) + remainingSpace + if remainingSpace > remainingToWrite { + li = int64(n) + remainingToWrite + } + nw, err = f.writeToBuffer(p[n:li]) + } else { + // There is no new line in the data remaining for us to write + // and it will fit in the next file so rotate. + forceRotate = true + } } else { // Write all the bytes in the current file nw, err = f.writeToBuffer(p[n:]) diff --git a/client/driver/logging/rotator_test.go b/client/driver/logging/rotator_test.go index 052b07fc9e3..7eaf4188719 100644 --- a/client/driver/logging/rotator_test.go +++ b/client/driver/logging/rotator_test.go @@ -169,6 +169,72 @@ func TestFileRotator_RotateFiles(t *testing.T) { }) } +func TestFileRotator_RotateFiles_Boundary(t *testing.T) { + t.Parallel() + var path string + var err error + if path, err = ioutil.TempDir("", pathPrefix); err != nil { + t.Fatalf("test setup err: %v", err) + } + defer os.RemoveAll(path) + + fr, err := NewFileRotator(path, baseFileName, 10, 5, logger) + if err != nil { + t.Fatalf("test setup err: %v", err) + } + + // We will write two times: + // 1st: Write with new lines spanning two files + // 2nd: Write long string with no new lines + expectations := [][]byte{ + []byte("ab\n"), + []byte("cdef\n"), + []byte("12345"), + []byte("6789"), + } + str1 := "ab\ncdef\n" + str2 := "123456789" + + nw, err := fr.Write([]byte(str1)) + if err != nil { + t.Fatalf("got error while writing: %v", err) + } + + if nw != len(str1) { + t.Fatalf("expected %v, got %v", len(str1), nw) + } + + nw, err = fr.Write([]byte(str2)) + if err != nil { + t.Fatalf("got error while writing: %v", err) + } + + if nw != len(str2) { + t.Fatalf("expected %v, got %v", len(str2), nw) + } + + var lastErr error + testutil.WaitForResult(func() (bool, error) { + + for i, exp := range expectations { + fname := filepath.Join(path, fmt.Sprintf("redis.stdout.%d", i)) + fi, err := os.Stat(fname) + if err != nil { + lastErr = err + return false, nil + } + if int(fi.Size()) != len(exp) { + lastErr = fmt.Errorf("expected size: %v, actual: %v", len(exp), fi.Size()) + return false, nil + } + } + + return true, nil + }, func(err error) { + t.Fatalf("%v", lastErr) + }) +} + func TestFileRotator_WriteRemaining(t *testing.T) { t.Parallel() var path string From 80ac8c2a37ee0857fa410b61763e0ac4f52a7f05 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Fri, 11 May 2018 10:52:09 -0700 Subject: [PATCH 4/4] Add new line test --- client/driver/logging/rotator_test.go | 31 +++++++++++---------------- 1 file changed, 12 insertions(+), 19 deletions(-) diff --git a/client/driver/logging/rotator_test.go b/client/driver/logging/rotator_test.go index 7eaf4188719..4fcf23952a0 100644 --- a/client/driver/logging/rotator_test.go +++ b/client/driver/logging/rotator_test.go @@ -183,34 +183,27 @@ func TestFileRotator_RotateFiles_Boundary(t *testing.T) { t.Fatalf("test setup err: %v", err) } - // We will write two times: + // We will write three times: // 1st: Write with new lines spanning two files // 2nd: Write long string with no new lines + // 3rd: Write a single new line expectations := [][]byte{ []byte("ab\n"), []byte("cdef\n"), []byte("12345"), - []byte("6789"), + []byte("67890"), + []byte("\n"), } - str1 := "ab\ncdef\n" - str2 := "123456789" - nw, err := fr.Write([]byte(str1)) - if err != nil { - t.Fatalf("got error while writing: %v", err) - } - - if nw != len(str1) { - t.Fatalf("expected %v, got %v", len(str1), nw) - } - - nw, err = fr.Write([]byte(str2)) - if err != nil { - t.Fatalf("got error while writing: %v", err) - } + for _, str := range []string{"ab\ncdef\n", "1234567890", "\n"} { + nw, err := fr.Write([]byte(str)) + if err != nil { + t.Fatalf("got error while writing: %v", err) + } - if nw != len(str2) { - t.Fatalf("expected %v, got %v", len(str2), nw) + if nw != len(str) { + t.Fatalf("expected %v, got %v", len(str), nw) + } } var lastErr error