Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

file logger: compressing is no more blocking #854

Merged
merged 1 commit into from
Oct 27, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 129 additions & 82 deletions workers/logfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package workers
import (
"bufio"
"bytes"
"compress/gzip"
"encoding/json"
"fmt"
"io"
Expand All @@ -16,6 +15,8 @@ import (
"strings"
"time"

"github.com/klauspost/compress/gzip"

"github.com/dmachard/go-dnscollector/dnsutils"
"github.com/dmachard/go-dnscollector/pkgconfig"
"github.com/dmachard/go-dnscollector/transformers"
Expand Down Expand Up @@ -53,20 +54,28 @@ type LogFile struct {
fileFd *os.File
fileSize int64
fileDir, fileName, fileExt, filePrefix string
commpressTimer *time.Timer
textFormat []string
compressQueue chan string
}

func NewLogFile(config *pkgconfig.Config, logger *logger.Logger, name string) *LogFile {
bufSize := config.Global.Worker.ChannelBufferSize
if config.Loggers.LogFile.ChannelBufferSize > 0 {
bufSize = config.Loggers.LogFile.ChannelBufferSize
}
w := &LogFile{GenericWorker: NewGenericWorker(config, logger, name, "file", bufSize, pkgconfig.DefaultMonitor)}
w := &LogFile{
GenericWorker: NewGenericWorker(config, logger, name, "file", bufSize, pkgconfig.DefaultMonitor),
compressQueue: make(chan string, 1),
}
w.ReadConfig()
if err := w.OpenFile(); err != nil {
if err := w.OpenCurrentFile(); err != nil {
w.LogFatal(pkgconfig.PrefixLogWorker+"["+name+"] file - unable to open output file:", err)
}

// start compressor
go w.startCompressor()
w.initializeCompressionQueue()

return w
}

Expand All @@ -88,7 +97,7 @@ func (w *LogFile) ReadConfig() {
w.LogInfo("running in mode: %s", w.GetConfig().Loggers.LogFile.Mode)
}

func (w *LogFile) Cleanup() error {
func (w *LogFile) RemoveOldFiles() error {
if w.GetConfig().Loggers.LogFile.MaxFiles == 0 {
return nil
}
Expand Down Expand Up @@ -124,7 +133,7 @@ func (w *LogFile) Cleanup() error {
sort.Ints(logFiles)

// too much log files ?
diffNB := len(logFiles) - w.GetConfig().Loggers.LogFile.MaxFiles
diffNB := len(logFiles) - (w.GetConfig().Loggers.LogFile.MaxFiles - 1)
if diffNB > 0 {
for i := 0; i < diffNB; i++ {
filename := fmt.Sprintf("%s-%d%s", w.filePrefix, logFiles[i], w.fileExt)
Expand All @@ -141,7 +150,8 @@ func (w *LogFile) Cleanup() error {
return nil
}

func (w *LogFile) OpenFile() error {
func (w *LogFile) OpenCurrentFile() error {
w.LogInfo("create new log file: %s", w.GetConfig().Loggers.LogFile.FilePath)

fd, err := os.OpenFile(w.GetConfig().Loggers.LogFile.FilePath, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
Expand Down Expand Up @@ -177,87 +187,79 @@ func (w *LogFile) OpenFile() error {

}

w.LogInfo("file opened with success: %s", w.GetConfig().Loggers.LogFile.FilePath)
w.LogInfo("new log file created")
return nil
}

func (w *LogFile) GetMaxSize() int64 {
return int64(1024*1024) * int64(w.GetConfig().Loggers.LogFile.MaxSize)
}

func (w *LogFile) CompressFile() {
entries, err := os.ReadDir(w.fileDir)
func (w *LogFile) compressFile(filename string) {
w.LogInfo("start to compress in background: %s", filename)

// prepare dest filename
baseName := filepath.Base(filename)
baseName = strings.TrimPrefix(baseName, "tocompress-")
tmpFile := filename + compressSuffix
dstFile := filepath.Join(filepath.Dir(filename), baseName+compressSuffix)

fd, err := os.Open(filename)
if err != nil {
w.LogError("unable to list all files: %s", err)
w.LogError("compress - failed to open file: %s", err)
return
}
defer fd.Close()

for _, entry := range entries {
// ignore folder
if entry.IsDir() {
continue
}

matched, _ := regexp.MatchString(`^`+w.filePrefix+`-\d+`+w.fileExt+`$`, entry.Name())
if matched {
src := filepath.Join(w.fileDir, entry.Name())
dst := filepath.Join(w.fileDir, entry.Name()+compressSuffix)

fd, err := os.Open(src)
if err != nil {
w.LogError("compress - failed to open file: ", err)
continue
}
defer fd.Close()

fi, err := os.Stat(src)
if err != nil {
w.LogError("compress - failed to stat file: ", err)
continue
}
fi, err := os.Stat(filename)
if err != nil {
w.LogError("compress - failed to stat file: %s", err)
return
}

gzf, err := os.OpenFile(dst, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, fi.Mode())
if err != nil {
w.LogError("compress - failed to open compressed file: ", err)
continue
}
defer gzf.Close()
gzf, err := os.OpenFile(tmpFile, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, fi.Mode())
if err != nil {
w.LogError("compress - failed to open compressed file: %s", err)
return
}
defer gzf.Close()

gz := gzip.NewWriter(gzf)
gz := gzip.NewWriter(gzf)

if _, err := io.Copy(gz, fd); err != nil {
w.LogError("compress - failed to compress file: ", err)
os.Remove(dst)
continue
}
if err := gz.Close(); err != nil {
w.LogError("compress - failed to close gz writer: ", err)
os.Remove(dst)
continue
}
if err := gzf.Close(); err != nil {
w.LogError("compress - failed to close gz file: ", err)
os.Remove(dst)
continue
}

if err := fd.Close(); err != nil {
w.LogError("compress - failed to close log file: ", err)
os.Remove(dst)
continue
}
if err := os.Remove(src); err != nil {
w.LogError("compress - failed to remove log file: ", err)
os.Remove(dst)
continue
}
if _, err := io.Copy(gz, fd); err != nil {
w.LogError("compress - failed to compress file: %s", err)
os.Remove(tmpFile)
return
}
if err := gz.Close(); err != nil {
w.LogError("compress - failed to close gz writer: %s", err)
os.Remove(tmpFile)
return
}
if err := gzf.Close(); err != nil {
w.LogError("compress - failed to close gz file: %s", err)
os.Remove(tmpFile)
return
}

// post rotate command?
w.CompressPostRotateCommand(dst)
}
if err := fd.Close(); err != nil {
w.LogError("compress - failed to close log file: %s", err)
os.Remove(tmpFile)
return
}
if err := os.Remove(filename); err != nil {
w.LogError("compress - failed to remove log file: %s", err)
os.Remove(tmpFile)
return
}

w.commpressTimer.Reset(time.Duration(w.GetConfig().Loggers.LogFile.CompressInterval) * time.Second)
// finally rename the gzip file
if err := os.Rename(tmpFile, dstFile); err != nil {
w.LogError("compress - unable to rename file: %s", err)
os.Remove(tmpFile)
return
}
w.LogInfo("compression terminated - %s", dstFile)
}

func (w *LogFile) PostRotateCommand(filename string) {
Expand Down Expand Up @@ -305,24 +307,34 @@ func (w *LogFile) RotateFile() error {
}

// Rename current log file
bfpath := filepath.Join(w.fileDir, fmt.Sprintf("%s-%d%s", w.filePrefix, time.Now().UnixNano(), w.fileExt))
newFilename := fmt.Sprintf("%s-%d%s", w.filePrefix, time.Now().UnixNano(), w.fileExt)
if w.config.Loggers.LogFile.Compress {
newFilename = fmt.Sprintf("tocompress-%s", newFilename)
}
bfpath := filepath.Join(w.fileDir, newFilename)
err := os.Rename(w.GetConfig().Loggers.LogFile.FilePath, bfpath)
if err != nil {
return err
}

// post rotate command?
w.PostRotateCommand(bfpath)
if w.config.Loggers.LogFile.Compress {
go func() {
w.compressQueue <- bfpath // Envoi asynchrone dans le canal pour compression
}()
} else {
w.PostRotateCommand(bfpath)
}

// keep only max files
err = w.Cleanup()
err = w.RemoveOldFiles()
if err != nil {
w.LogError("unable to cleanup log files: %s", err)
return err
}

// re-create new one
if err := w.OpenFile(); err != nil {
if err := w.OpenCurrentFile(); err != nil {
w.LogError("unable to re-create file: %s", err)
return err
}
Expand Down Expand Up @@ -399,6 +411,44 @@ func (w *LogFile) WriteToDnstap(data []byte) {
w.fileSize += int64(n)
}

func (w *LogFile) initializeCompressionQueue() {
// Get all files in the log directory
files, err := os.ReadDir(w.fileDir)
if err != nil {
w.LogError("error reading log directory: %v", err)
return
}

// Find files that start with "tocompress-"
for _, file := range files {
fileName := file.Name()

// Check if the file is both marked for compression and has a `.gz` suffix
if strings.HasPrefix(fileName, "tocompress-") && strings.HasSuffix(fileName, ".gz") {
// Build the full path of the file
fullPath := filepath.Join(w.fileDir, fileName)

// Attempt to remove incomplete .gz file
if err := os.Remove(fullPath); err != nil {
w.LogError("error deleting incomplete compressed file %s: %v", fileName, err)
}
continue
}

// If it's a pending compression file, add it to the compression queue
if strings.HasPrefix(fileName, "tocompress-") && !strings.HasSuffix(fileName, ".gz") {
fullPath := filepath.Join(w.fileDir, fileName)
w.compressQueue <- fullPath
}
}
}

func (w *LogFile) startCompressor() {
for filename := range w.compressQueue {
w.compressFile(filename)
}
}

func (w *LogFile) StartCollect() {
w.LogInfo("starting data collection")
defer w.CollectDone()
Expand Down Expand Up @@ -432,6 +482,7 @@ func (w *LogFile) StartCollect() {
w.LogInfo("input channel closed!")
return
}

// count global messages
w.CountIngressTraffic()

Expand All @@ -447,6 +498,7 @@ func (w *LogFile) StartCollect() {

// send to output channel
w.CountEgressTraffic()

w.GetOutputChannel() <- dm

// send to next ?
Expand All @@ -462,7 +514,6 @@ func (w *LogFile) StartLogging() {
// prepare some timers
flushInterval := time.Duration(w.GetConfig().Loggers.LogFile.FlushInterval) * time.Second
flushTimer := time.NewTimer(flushInterval)
w.commpressTimer = time.NewTimer(time.Duration(w.GetConfig().Loggers.LogFile.CompressInterval) * time.Second)

buffer := new(bytes.Buffer)
var data []byte
Expand All @@ -476,9 +527,10 @@ func (w *LogFile) StartLogging() {
for {
select {
case <-w.OnLoggerStopped():
close(w.compressQueue)

// stop timer
flushTimer.Stop()
w.commpressTimer.Stop()

// Force write remaining batch data
if batchSize > 0 {
Expand Down Expand Up @@ -585,11 +637,6 @@ func (w *LogFile) StartLogging() {
buffer.Reset()
flushTimer.Reset(flushInterval)

case <-w.commpressTimer.C:
if w.GetConfig().Loggers.LogFile.Compress {
w.CompressFile()
}

}
}
}
Loading