Skip to content

Commit

Permalink
Output file fixes
Browse files Browse the repository at this point in the history
* Fixed flush/Write race
* Fixed names with underscores
  • Loading branch information
buger committed Jun 14, 2016
1 parent c4271ff commit 95eecb6
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 4 deletions.
20 changes: 16 additions & 4 deletions output_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"strconv"
"strings"
"time"
"sync"
)

var dateFileNameFuncs = map[string]func() string{
Expand All @@ -33,6 +34,7 @@ type FileOutputConfig struct {

// FileOutput output plugin
type FileOutput struct {
mu sync.Mutex
pathTemplate string
currentName string
file *os.File
Expand Down Expand Up @@ -87,7 +89,9 @@ func setFileIndex(name string, idx int) string {
withoutExt := strings.TrimSuffix(name, ext)

if i := strings.LastIndex(withoutExt, "_"); i != -1 {
withoutExt = withoutExt[:i]
if _, err := strconv.Atoi(withoutExt[i+1:]); err == nil {
withoutExt = withoutExt[:i]
}
}

return withoutExt + "_" + idxS + ext
Expand Down Expand Up @@ -120,6 +124,9 @@ func (s sortByFileIndex) Less(i, j int) bool {
}

func (o *FileOutput) filename() string {
defer o.mu.Unlock()
o.mu.Lock()

path := o.pathTemplate

for name, fn := range dateFileNameFuncs {
Expand Down Expand Up @@ -172,6 +179,7 @@ func (o *FileOutput) Write(data []byte) (n int, err error) {
}

if o.file == nil || o.currentName != o.file.Name() {
o.mu.Lock()
o.Close()

o.file, err = os.OpenFile(o.currentName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660)
Expand All @@ -188,6 +196,7 @@ func (o *FileOutput) Write(data []byte) (n int, err error) {
}

o.queueLength = 0
o.mu.Unlock()
}

o.writer.Write(data)
Expand All @@ -199,16 +208,19 @@ func (o *FileOutput) Write(data []byte) (n int, err error) {
}

func (o *FileOutput) flush() {
defer o.mu.Unlock()
o.mu.Lock()

if o.file != nil {
if strings.HasSuffix(o.currentName, ".gz") {
o.writer.(*gzip.Writer).Flush()
} else {
o.writer.(*bufio.Writer).Flush()
}
}

if stat, err := o.file.Stat(); err != nil {
o.chunkSize = int(stat.Size())
if stat, err := o.file.Stat(); err != nil {
o.chunkSize = int(stat.Size())
}
}
}

Expand Down
1 change: 1 addition & 0 deletions output_file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ func TestSetFileIndex(t *testing.T) {
{"/tmp/logs_1", 0, "/tmp/logs_0"},
{"/tmp/logs_0", 10, "/tmp/logs_10"},
{"/tmp/logs_0.gz", 10, "/tmp/logs_10.gz"},
{"/tmp/logs_underscores.gz", 10, "/tmp/logs_underscores_10.gz"},
}

for _, c := range tests {
Expand Down

0 comments on commit 95eecb6

Please sign in to comment.