Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing race issues in log rotator #1047

Merged
merged 3 commits into from
Apr 7, 2016
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 40 additions & 17 deletions client/driver/logging/rotator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ import (
"time"
)

var (
const (
bufSize = 32768
flushDur = 100 * time.Millisecond
)

Expand All @@ -31,6 +32,7 @@ type FileRotator struct {
currentFile *os.File // currentFile is the file that is currently getting written
currentWr int64 // currentWr is the number of bytes written to the current file
bufw *bufio.Writer
bufLock sync.Mutex

flushTicker *time.Ticker
logger *log.Logger
Expand Down Expand Up @@ -74,9 +76,10 @@ func (f *FileRotator) Write(p []byte) (n int, err error) {
// Check if we still have space in the current file, otherwise close and
// open the next file
if f.currentWr >= f.FileSize {
f.bufw.Flush()
f.flushBuffer()
f.currentFile.Close()
if err := f.nextFile(); err != nil {
f.logger.Printf("[ERROR] driver.rotator: error creating next file: %v", err)
return 0, err
}
}
Expand All @@ -88,12 +91,10 @@ func (f *FileRotator) Write(p []byte) (n int, err error) {
if remainingSize < int64(len(p[n:])) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should scan backwards through the buffer for a \n and write/flush everything before that. Whatever comes after that last \n should be moved to the beginning of the buffer and written out on the next write (as long as the line length is less than the buffer length, in which case you need to truncate). This only needs to happen on file rotation, not on write. I'll open an issue re: this in a sec.

// Write the number of bytes that we can write on the current file
li := int64(n) + remainingSize
nw, err = f.bufw.Write(p[n:li])
//nw, err = f.currentFile.Write(p[n:li])
nw, err = f.writeToBuffer(p[n:li])
} else {
// Write all the bytes in the current file
nw, err = f.bufw.Write(p[n:])
//nw, err = f.currentFile.Write(p[n:])
nw, err = f.writeToBuffer(p[n:])
}

// Increment the number of bytes written so far in this method
Expand All @@ -103,6 +104,7 @@ func (f *FileRotator) Write(p []byte) (n int, err error) {
// Increment the total number of bytes in the file
f.currentWr += int64(n)
if err != nil {
f.logger.Printf("[ERROR] driver.rotator: error writing to file: %v", err)
return
}
}
Expand Down Expand Up @@ -179,21 +181,15 @@ func (f *FileRotator) createFile() error {
return err
}
f.currentWr = fi.Size()
if f.bufw == nil {
f.bufw = bufio.NewWriter(f.currentFile)
} else {
f.bufw.Reset(f.currentFile)
}
f.createOrResetBuffer()
return nil
}

// flushPeriodically flushes the buffered writer every 100ms to the underlying
// file
func (f *FileRotator) flushPeriodically() {
for _ = range f.flushTicker.C {
if f.bufw != nil {
f.bufw.Flush()
}
f.flushBuffer()
}
}

Expand All @@ -203,9 +199,7 @@ func (f *FileRotator) Close() {

// Stop the ticker and flush for one last time
f.flushTicker.Stop()
if f.bufw != nil {
f.bufw.Flush()
}
f.flushBuffer()

// Stop the purge go routine
if !f.closed {
Expand Down Expand Up @@ -258,3 +252,32 @@ func (f *FileRotator) purgeOldFiles() {
}
}
}

// flushBuffer flushes the buffer
func (f *FileRotator) flushBuffer() error {
f.bufLock.Lock()
defer f.bufLock.Unlock()
if f.bufw != nil {
return f.bufw.Flush()
}
return nil
}

// writeToBuffer writes the byte array to buffer
func (f *FileRotator) writeToBuffer(p []byte) (int, error) {
f.bufLock.Lock()
defer f.bufLock.Unlock()
return f.bufw.Write(p)
}

// createOrResetBuffer creates a new buffer if we don't have one otherwise
// resets the buffer
func (f *FileRotator) createOrResetBuffer() {
f.bufLock.Lock()
defer f.bufLock.Unlock()
if f.bufw == nil {
f.bufw = bufio.NewWriterSize(f.currentFile, bufSize)
} else {
f.bufw.Reset(f.currentFile)
}
}