Skip to content

Commit

Permalink
add zap sampling to reduce timestamp warn logs
Browse files Browse the repository at this point in the history
  • Loading branch information
ymtaye committed Jan 30, 2024
1 parent f2caf17 commit cb8c75c
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 4 deletions.
24 changes: 24 additions & 0 deletions logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ package logger

import (
"io"
"os"
"time"

"github.com/influxdata/wlog"
"go.uber.org/zap"
Expand Down Expand Up @@ -94,6 +96,28 @@ func ConvertToLetterLevel(l zapcore.Level) string {
return string(l.CapitalString()[0])
}

func SampledLogger() *zap.Logger {
stdout := zapcore.AddSync(os.Stdout)

level := zap.NewAtomicLevelAt(zap.InfoLevel)
productionCfg := newProductionEncoderConfig()
productionCfg.TimeKey = "timestamp"
productionCfg.EncodeTime = zapcore.ISO8601TimeEncoder
jsonEncoder := zapcore.NewJSONEncoder(productionCfg)

jsonOutCore := zapcore.NewCore(jsonEncoder, stdout, level)

// Create a logger that samples every Nth message after the first M messages every S seconds
// where N = sc.Thereafter, M = sc.Initial, S = sc.Tick.
samplingCore := zapcore.NewSamplerWithOptions(
jsonOutCore,
time.Hour, // interval
5, // log first 3 entries
500, // thereafter log zero entires within the interval
)
return zap.New(samplingCore)
}
func init() {
loggerLevel = zap.NewAtomicLevelAt(zapcore.InfoLevel)
}

15 changes: 11 additions & 4 deletions plugins/outputs/cloudwatchlogs/pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
package cloudwatchlogs

import (
cwaLogger "github.com/aws/amazon-cloudwatch-agent/logger"
"math/rand"
"sort"
"strconv"
"sync"
"time"

Expand All @@ -16,6 +18,8 @@ import (

"github.com/aws/amazon-cloudwatch-agent/logs"
"github.com/aws/amazon-cloudwatch-agent/profiler"

"go.uber.org/zap"
)

const (
Expand Down Expand Up @@ -129,6 +133,8 @@ func hasValidTime(e logs.LogEvent) bool {
func (p *pusher) start() {
defer p.wg.Done()

sampledLogger := cwaLogger.SampledLogger()

ec := make(chan logs.LogEvent)

// Merge events from both blocking and non-blocking channel
Expand All @@ -154,7 +160,7 @@ func (p *pusher) start() {
p.resetFlushTimer()
}

ce := p.convertEvent(e)
ce := p.convertEvent(e, sampledLogger)
et := time.Unix(*ce.Timestamp/1000, *ce.Timestamp%1000) // Cloudwatch Log Timestamp is in Millisecond

// A batch of log events in a single request cannot span more than 24 hours.
Expand Down Expand Up @@ -410,7 +416,7 @@ func (p *pusher) resetFlushTimer() {
p.flushTimer.Reset(p.FlushTimeout)
}

func (p *pusher) convertEvent(e logs.LogEvent) *cloudwatchlogs.InputLogEvent {
func (p *pusher) convertEvent(e logs.LogEvent, sampledLogger *zap.Logger) *cloudwatchlogs.InputLogEvent {
message := e.Message()

if len(message) > msgSizeLimit {
Expand All @@ -424,9 +430,9 @@ func (p *pusher) convertEvent(e logs.LogEvent) *cloudwatchlogs.InputLogEvent {
// not have a timestamp.
t = p.lastValidTime
if !p.lastUpdateTime.IsZero() {
// Check when timestamp has an interval of 5 days.
// Check when timestamp has an interval of 1 days.
if time.Since(p.lastUpdateTime) > warnOldTimeStamp {
p.Log.Warnf("Unable to parse timestamp, using last valid timestamp found in the logs %v: which is at least older than 1 day for log group %v: ", p.lastValidTime, p.Group)
sampledLogger.Warn("Unable to parse timestamp, using last valid timestamp found in the logs " + strconv.Itoa(int(p.lastValidTime)) + ": which is at least older than 1 day for log group " + p.Group + ": ")
}
}
} else {
Expand Down Expand Up @@ -461,3 +467,4 @@ func (inputLogEvents ByTimestamp) Swap(i, j int) {
func (inputLogEvents ByTimestamp) Less(i, j int) bool {
return *inputLogEvents[i].Timestamp < *inputLogEvents[j].Timestamp
}

0 comments on commit cb8c75c

Please sign in to comment.