From 973e659b4cdfb5b9897256342a6bd020d2428a56 Mon Sep 17 00:00:00 2001 From: Andrew Huynh Date: Wed, 27 Apr 2022 16:54:06 -0400 Subject: [PATCH 01/19] set 1mb threshold for file offset --- plugins/inputs/logfile/logfile.go | 7 ++++++ plugins/inputs/logfile/logfile_test.go | 33 +++++++++++++++++++++++++- 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/plugins/inputs/logfile/logfile.go b/plugins/inputs/logfile/logfile.go index e1c7f48fbf..b248c19f6a 100644 --- a/plugins/inputs/logfile/logfile.go +++ b/plugins/inputs/logfile/logfile.go @@ -22,6 +22,8 @@ import ( "github.com/influxdata/telegraf/plugins/inputs" ) +var offsetThreshold int64 = 1024 * 1024 // PutLogEvents API maxes out at 1mb for a payload + type LogFile struct { //array of file config for file to be monitored. FileConfig []FileConfig `toml:"file_config"` @@ -332,6 +334,11 @@ func (t *LogFile) restoreState(filename string) (int64, error) { return 0, err } + if offset < offsetThreshold { + t.Log.Errorf("Offset %d is less than the max size of a batch of logs sent to PLE. Publish from the beginning", offset) + offset = 0 + } + t.Log.Infof("Reading from offset %v in %s", offset, filename) return offset, nil diff --git a/plugins/inputs/logfile/logfile_test.go b/plugins/inputs/logfile/logfile_test.go index ebea61c5d4..fa1dca8462 100644 --- a/plugins/inputs/logfile/logfile_test.go +++ b/plugins/inputs/logfile/logfile_test.go @@ -198,7 +198,34 @@ func TestRestoreState(t *testing.T) { logFilePath := "/tmp/logfile.log" logFileStateFileName := "_tmp_logfile.log" - offset := int64(9323) + offset := offsetThreshold + 1 + err = ioutil.WriteFile( + tmpfolder+string(filepath.Separator)+logFileStateFileName, + []byte(strconv.FormatInt(offset, 10)+"\n"+logFilePath), + os.ModePerm) + require.NoError(t, err) + + tt := NewLogFile() + tt.Log = TestLogger{t} + tt.FileStateFolder = tmpfolder + roffset, err := tt.restoreState(logFilePath) + assert.Equal(t, offset, roffset, fmt.Sprintf("The actual offset is %d, different from the expected offset %d.", roffset, offset)) + tt.Stop() +} + +// TestRestoreStateReadFromBeginning ensures that if the offset stored in the state file +// is less than the defined offsetThreshold, we read from the beginning instead of upholding +// the stored offset. https://github.com/aws/amazon-cloudwatch-agent/issues/447 +func TestRestoreStateReadFromBeginning(t *testing.T) { + multilineWaitPeriod = 10 * time.Millisecond + tmpfolder, err := ioutil.TempDir("", "") + require.NoError(t, err) + defer os.RemoveAll(tmpfolder) + + logFilePath := "/tmp/logfile.log" + logFileStateFileName := "_tmp_logfile.log" + + offset := offsetThreshold + 1 err = ioutil.WriteFile( tmpfolder+string(filepath.Separator)+logFileStateFileName, []byte(strconv.FormatInt(offset, 10)+"\n"+logFilePath), @@ -618,6 +645,7 @@ func TestLogsFileTruncate(t *testing.T) { func TestLogsFileWithOffset(t *testing.T) { multilineWaitPeriod = 10 * time.Millisecond + offsetThreshold = 10 logEntryString := "xxxxxxxxxxContentAfterOffset" tmpfile, err := createTempFile("", "") @@ -668,6 +696,7 @@ func TestLogsFileWithOffset(t *testing.T) { func TestLogsFileWithInvalidOffset(t *testing.T) { multilineWaitPeriod = 10 * time.Millisecond + offsetThreshold = 10 logEntryString := "xxxxxxxxxxContentAfterOffset" tmpfile, err := createTempFile("", "") @@ -720,6 +749,8 @@ func TestLogsFileWithInvalidOffset(t *testing.T) { // at that same offset in the state file. func TestLogsFileRecreate(t *testing.T) { multilineWaitPeriod = 10 * time.Millisecond + offsetThreshold = 10 + logEntryString := "xxxxxxxxxxContentAfterOffset" expectedContent := "ContentAfterOffset" From fb1032e137745cc64a73a4307870c8b6daf864ad Mon Sep 17 00:00:00 2001 From: Andrew Huynh Date: Wed, 27 Apr 2022 17:13:19 -0400 Subject: [PATCH 02/19] change log to debug level --- plugins/inputs/logfile/logfile.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/inputs/logfile/logfile.go b/plugins/inputs/logfile/logfile.go index b248c19f6a..6da141eb14 100644 --- a/plugins/inputs/logfile/logfile.go +++ b/plugins/inputs/logfile/logfile.go @@ -335,7 +335,7 @@ func (t *LogFile) restoreState(filename string) (int64, error) { } if offset < offsetThreshold { - t.Log.Errorf("Offset %d is less than the max size of a batch of logs sent to PLE. Publish from the beginning", offset) + t.Log.Debugf("Offset %d is less than allowed %d for publishing logs. Reset to zero offset", offset, offsetThreshold) offset = 0 } From 49b9d0b58e3fdc2f8fd58c471a5d796a9dae45de Mon Sep 17 00:00:00 2001 From: Andrew Huynh Date: Wed, 27 Apr 2022 20:42:47 -0400 Subject: [PATCH 03/19] delete statefile when log file is deleted --- plugins/inputs/logfile/logfile.go | 21 +++++++++++++++++---- plugins/inputs/logfile/logfile_test.go | 4 +++- plugins/inputs/logfile/tail/tail.go | 12 +++++++++--- plugins/inputs/logfile/tailersrc.go | 8 ++++++++ 4 files changed, 37 insertions(+), 8 deletions(-) diff --git a/plugins/inputs/logfile/logfile.go b/plugins/inputs/logfile/logfile.go index 6da141eb14..752d97108f 100644 --- a/plugins/inputs/logfile/logfile.go +++ b/plugins/inputs/logfile/logfile.go @@ -252,6 +252,17 @@ func (t *LogFile) FindLogSrc() []logs.LogSrc { } }(src)) + //src.AddCleanUpFn(func(ts *tailerSrc) func() { + // return func() { + // select { + // case closed := <-ts.tailer.FileDeletedCh: + // if closed { + // os.Remove(ts.stateFilePath) + // } + // default: + // } + // } + //}(src)) srcs = append(srcs, src) @@ -334,10 +345,12 @@ func (t *LogFile) restoreState(filename string) (int64, error) { return 0, err } - if offset < offsetThreshold { - t.Log.Debugf("Offset %d is less than allowed %d for publishing logs. Reset to zero offset", offset, offsetThreshold) - offset = 0 - } + // https://github.com/aws/amazon-cloudwatch-agent/issues/447 + // try to not drop the first log in a log file when rotated by reasoning that if the + //if offset < offsetThreshold { + // t.Log.Debugf("Offset %d is less than allowed %d for publishing logs. Reset to zero offset", offset, offsetThreshold) + // offset = 0 + //} t.Log.Infof("Reading from offset %v in %s", offset, filename) diff --git a/plugins/inputs/logfile/logfile_test.go b/plugins/inputs/logfile/logfile_test.go index fa1dca8462..c662cd2086 100644 --- a/plugins/inputs/logfile/logfile_test.go +++ b/plugins/inputs/logfile/logfile_test.go @@ -831,8 +831,10 @@ func TestLogsFileRecreate(t *testing.T) { } }) + // after the file gets deleted, the state file should be deleted too, so + // the tailer should start from the beginning. e = <-evts - if e.Message() != expectedContent { + if e.Message() != logEntryString { t.Errorf("Wrong log found after file replacement: \n% x\nExpecting:\n% x\n", e.Message(), expectedContent) } diff --git a/plugins/inputs/logfile/tail/tail.go b/plugins/inputs/logfile/tail/tail.go index b270e197e4..a9a1b50786 100644 --- a/plugins/inputs/logfile/tail/tail.go +++ b/plugins/inputs/logfile/tail/tail.go @@ -83,6 +83,8 @@ type Tail struct { dropCnt int lk sync.Mutex + + FileDeletedCh chan bool } // TailFile begins tailing the file. Output stream is made available @@ -95,9 +97,10 @@ func TailFile(filename string, config Config) (*Tail, error) { } t := &Tail{ - Filename: filename, - Lines: make(chan *Line), - Config: config, + Filename: filename, + Lines: make(chan *Line), + Config: config, + FileDeletedCh: make(chan bool), } // when Logger was not specified in config, create new one @@ -169,6 +172,9 @@ func (tail *Tail) close() { if tail.dropCnt > 0 { tail.Logger.Errorf("Dropped %v lines for stopped tail for file %v", tail.dropCnt, tail.Filename) } + if tail.isFileDeleted() { + close(tail.FileDeletedCh) + } close(tail.Lines) tail.closeFile() } diff --git a/plugins/inputs/logfile/tailersrc.go b/plugins/inputs/logfile/tailersrc.go index 858a869ad6..e6501363b8 100644 --- a/plugins/inputs/logfile/tailersrc.go +++ b/plugins/inputs/logfile/tailersrc.go @@ -280,6 +280,14 @@ func (ts *tailerSrc) cleanUp() { for _, clf := range ts.cleanUpFns { clf() } + + // delete state file if log file was closed + select { + case <-ts.tailer.FileDeletedCh: + os.Remove(ts.stateFilePath) + default: + } + if ts.outputFn != nil { ts.outputFn(nil) // inform logs agent the tailer src's exit, to stop runSrcToDest } From 74c836f4e0da2ca6757027967130d1b30e89af62 Mon Sep 17 00:00:00 2001 From: Andrew Huynh Date: Wed, 27 Apr 2022 20:55:25 -0400 Subject: [PATCH 04/19] close channel earlier --- plugins/inputs/logfile/tail/tail.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/plugins/inputs/logfile/tail/tail.go b/plugins/inputs/logfile/tail/tail.go index a9a1b50786..a6e866095f 100644 --- a/plugins/inputs/logfile/tail/tail.go +++ b/plugins/inputs/logfile/tail/tail.go @@ -172,9 +172,6 @@ func (tail *Tail) close() { if tail.dropCnt > 0 { tail.Logger.Errorf("Dropped %v lines for stopped tail for file %v", tail.dropCnt, tail.Filename) } - if tail.isFileDeleted() { - close(tail.FileDeletedCh) - } close(tail.Lines) tail.closeFile() } @@ -391,6 +388,7 @@ func (tail *Tail) tailFileSync() { err := tail.waitForChanges() if err != nil { if err == ErrDeletedNotReOpen { + close(tail.FileDeletedCh) for { line, errReadLine := tail.readLine() if errReadLine == nil { From 706098766de49a0be150b705b181c00ecc9dd759 Mon Sep 17 00:00:00 2001 From: Andrew Huynh Date: Wed, 27 Apr 2022 21:18:25 -0400 Subject: [PATCH 05/19] change hook where statefile gets deleted --- plugins/inputs/logfile/tailersrc.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/plugins/inputs/logfile/tailersrc.go b/plugins/inputs/logfile/tailersrc.go index e6501363b8..7fd49692d8 100644 --- a/plugins/inputs/logfile/tailersrc.go +++ b/plugins/inputs/logfile/tailersrc.go @@ -281,12 +281,12 @@ func (ts *tailerSrc) cleanUp() { clf() } - // delete state file if log file was closed - select { - case <-ts.tailer.FileDeletedCh: - os.Remove(ts.stateFilePath) - default: - } + //// delete state file if log file was closed + //select { + //case <-ts.tailer.FileDeletedCh: + // os.Remove(ts.stateFilePath) + //default: + //} if ts.outputFn != nil { ts.outputFn(nil) // inform logs agent the tailer src's exit, to stop runSrcToDest @@ -314,6 +314,8 @@ func (ts *tailerSrc) runSaveState() { continue } lastSavedOffset = offset + case <-ts.tailer.FileDeletedCh: + os.Remove(ts.stateFilePath) case <-ts.done: err := ts.saveState(offset.offset) if err != nil { From 98f6f86fb0d43ed26ba3606c3de09b95acd1e3db Mon Sep 17 00:00:00 2001 From: Andrew Huynh Date: Wed, 27 Apr 2022 21:33:32 -0400 Subject: [PATCH 06/19] add log --- plugins/inputs/logfile/tailersrc.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/plugins/inputs/logfile/tailersrc.go b/plugins/inputs/logfile/tailersrc.go index 7fd49692d8..4d4cf9da65 100644 --- a/plugins/inputs/logfile/tailersrc.go +++ b/plugins/inputs/logfile/tailersrc.go @@ -297,6 +297,7 @@ func (ts *tailerSrc) runSaveState() { t := time.NewTicker(100 * time.Millisecond) defer t.Stop() + var deletedStateFile bool var offset, lastSavedOffset fileOffset for { select { @@ -315,7 +316,13 @@ func (ts *tailerSrc) runSaveState() { } lastSavedOffset = offset case <-ts.tailer.FileDeletedCh: - os.Remove(ts.stateFilePath) + if !deletedStateFile { + log.Printf("W! [logfile] deleting state file %s", ts.stateFilePath) + err := os.Remove(ts.stateFilePath) + if err == nil { + deletedStateFile = true + } + } case <-ts.done: err := ts.saveState(offset.offset) if err != nil { From 17d05cdee6e225a395032f9d45023afe008dbb8e Mon Sep 17 00:00:00 2001 From: Andrew Huynh Date: Wed, 27 Apr 2022 21:48:30 -0400 Subject: [PATCH 07/19] return --- plugins/inputs/logfile/tailersrc.go | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/plugins/inputs/logfile/tailersrc.go b/plugins/inputs/logfile/tailersrc.go index 4d4cf9da65..4739258dcc 100644 --- a/plugins/inputs/logfile/tailersrc.go +++ b/plugins/inputs/logfile/tailersrc.go @@ -297,7 +297,6 @@ func (ts *tailerSrc) runSaveState() { t := time.NewTicker(100 * time.Millisecond) defer t.Stop() - var deletedStateFile bool var offset, lastSavedOffset fileOffset for { select { @@ -316,13 +315,9 @@ func (ts *tailerSrc) runSaveState() { } lastSavedOffset = offset case <-ts.tailer.FileDeletedCh: - if !deletedStateFile { - log.Printf("W! [logfile] deleting state file %s", ts.stateFilePath) - err := os.Remove(ts.stateFilePath) - if err == nil { - deletedStateFile = true - } - } + log.Printf("W! [logfile] deleting state file %s", ts.stateFilePath) + os.Remove(ts.stateFilePath) + return case <-ts.done: err := ts.saveState(offset.offset) if err != nil { From 2233ae9742804f8d4f1f5cb4693db58371ba63c8 Mon Sep 17 00:00:00 2001 From: Andrew Huynh Date: Fri, 29 Apr 2022 10:05:09 -0400 Subject: [PATCH 08/19] add integration test --- integration/terraform/ec2/linux/README.md | 4 +- integration/test/agent_util.go | 8 +++ .../test/cloudwatchlogs/publish_logs_test.go | 40 +++++++++++-- .../resources/config_log_rotated.json | 19 ++++++ .../resources/write_and_rotate_logs.py | 28 +++++++++ integration/test/cwl_util.go | 60 +++++++++++++++++++ 6 files changed, 153 insertions(+), 6 deletions(-) create mode 100644 integration/test/cloudwatchlogs/resources/config_log_rotated.json create mode 100644 integration/test/cloudwatchlogs/resources/write_and_rotate_logs.py diff --git a/integration/terraform/ec2/linux/README.md b/integration/terraform/ec2/linux/README.md index 1ddea73f07..071730e762 100644 --- a/integration/terraform/ec2/linux/README.md +++ b/integration/terraform/ec2/linux/README.md @@ -312,4 +312,6 @@ terraform destroy --auto-approve 6. make 7. aws-cli 8. CloudWatchAgentServerRole is attached -9. crontab \ No newline at end of file +9. crontab +10. gcc +11. python3 diff --git a/integration/test/agent_util.go b/integration/test/agent_util.go index 84ffe7b814..dd10fdded9 100644 --- a/integration/test/agent_util.go +++ b/integration/test/agent_util.go @@ -90,6 +90,14 @@ func RunShellScript(path string, args ...string) { } } +func RunCommand(cmd string) { + out, err := exec.Command("bash", "-c", cmd).Output() + + if err != nil { + log.Fatalf("Error occurred when executing %s: %s | %s", cmd, err.Error(), string(out)) + } +} + func ReplaceLocalStackHostName(pathIn string) { out, err := exec.Command("bash", "-c", "sed -i 's/localhost.localstack.cloud/'\"$LOCAL_STACK_HOST_NAME\"'/g' "+pathIn).Output() diff --git a/integration/test/cloudwatchlogs/publish_logs_test.go b/integration/test/cloudwatchlogs/publish_logs_test.go index db777162db..78c5586dd8 100644 --- a/integration/test/cloudwatchlogs/publish_logs_test.go +++ b/integration/test/cloudwatchlogs/publish_logs_test.go @@ -11,6 +11,7 @@ import ( "github.com/aws/amazon-cloudwatch-agent/integration/test" "log" "os" + "strings" "testing" "time" @@ -54,7 +55,7 @@ func TestWriteLogsToCloudWatch(t *testing.T) { instanceId := test.GetInstanceId() log.Printf("Found instance id %s", instanceId) - defer cleanUp(instanceId) + defer test.DeleteLogGroupAndStream(instanceId, instanceId) for _, param := range testParameters { t.Run(param.testName, func(t *testing.T) { @@ -77,6 +78,39 @@ func TestWriteLogsToCloudWatch(t *testing.T) { } } +// Validate https://github.com/aws/amazon-cloudwatch-agent/issues/447 +func TestRotatingLogsDoesNotSkipLines(t *testing.T) { + cfgFilePath := "resources/config_log_rotated.json" + + instanceId := test.GetInstanceId() + log.Printf("Found instance id %s", instanceId) + logGroup := instanceId + logStream := instanceId + "Rotated" + + defer test.DeleteLogGroupAndStream(logGroup, logStream) + + start := time.Now() + test.CopyFile(cfgFilePath, configOutputPath) + + test.StartAgent(configOutputPath) + + // ensure that there is enough time from the "start" time and the first log line, + // so we don't miss it in the GetLogEvents call + time.Sleep(agentRuntime) + t.Log("Writing logs and rotating") + // execute the script used in the repro case + test.RunCommand("/usr/bin/python3 resources/write_and_rotate_logs.py") + time.Sleep(agentRuntime) + test.StopAgent() + + lines := []string{ + fmt.Sprintf("{\"Metric\": \"%s\"}", strings.Repeat("12345", 10)), + fmt.Sprintf("{\"Metric\": \"%s\"}", strings.Repeat("09876", 10)), + fmt.Sprintf("{\"Metric\": \"%s\"}", strings.Repeat("1234567890", 10)), + } + test.ValidateLogsInOrder(t, logGroup, logStream, lines, start) +} + func writeLogs(t *testing.T, filePath string, iterations int) { f, err := os.Create(filePath) if err != nil { @@ -99,7 +133,3 @@ func writeLogs(t *testing.T, filePath string, iterations int) { time.Sleep(1 * time.Millisecond) } } - -func cleanUp(instanceId string) { - test.DeleteLogGroupAndStream(instanceId, instanceId) -} diff --git a/integration/test/cloudwatchlogs/resources/config_log_rotated.json b/integration/test/cloudwatchlogs/resources/config_log_rotated.json new file mode 100644 index 0000000000..6d19e614df --- /dev/null +++ b/integration/test/cloudwatchlogs/resources/config_log_rotated.json @@ -0,0 +1,19 @@ +{ + "agent": { + "run_as_user": "root" + }, + "logs": { + "logs_collected": { + "files": { + "collect_list": [ + { + "file_path": "/tmp/rotate_me.log*", + "log_group_name": "{instance_id}", + "log_stream_name": "{instance_id}Rotated", + "timezone": "UTC" + } + ] + } + } + } +} \ No newline at end of file diff --git a/integration/test/cloudwatchlogs/resources/write_and_rotate_logs.py b/integration/test/cloudwatchlogs/resources/write_and_rotate_logs.py new file mode 100644 index 0000000000..097ccd2d33 --- /dev/null +++ b/integration/test/cloudwatchlogs/resources/write_and_rotate_logs.py @@ -0,0 +1,28 @@ +""" +Lifted this from https://github.com/aws/amazon-cloudwatch-agent/issues/447 +because I was not able to adequately reproduce the issue natively in Go, +directly in the integration test code. +""" +import json +import logging +import time +from logging.handlers import TimedRotatingFileHandler + +# get root logger +logger = logging.getLogger() +logger.setLevel(logging.INFO) + +# rotate our log file every 10 seconds +handler = TimedRotatingFileHandler("/tmp/rotate_me.log", when="S", interval=10) +logger.addHandler(handler) + +# log a message +logging.info(json.dumps({"Metric": "12345"*10})) +# sleep so that file will rotate upon next log message +time.sleep(15) +# log another message (this one will not appear since byte length of message == byte length of old log file) +logging.info(json.dumps({"Metric": "09876"*10})) +# sleep again so that file will rotate upon next log message +time.sleep(15) +# this message will be partially written +logging.info({"Metric": "1234567890"*10}) diff --git a/integration/test/cwl_util.go b/integration/test/cwl_util.go index 4a70c173eb..d75489c575 100644 --- a/integration/test/cwl_util.go +++ b/integration/test/cwl_util.go @@ -10,6 +10,7 @@ import ( "context" "errors" "log" + "strings" "testing" "time" @@ -104,6 +105,65 @@ func DeleteLogGroupAndStream(logGroupName, logStreamName string) { } } +// ValidateLogsInOrder takes a log group, log stream, a list of specific log lines and a timestamp. +// It should query the given log stream for log events, and then confirm that the log lines that are +// returned match the expected log lines. This also sanitizes the log lines from both the output and +// the expected lines input to ensure that they don't diverge in JSON representation (" vs ') +func ValidateLogsInOrder(t *testing.T, logGroup, logStream string, logLines []string, since time.Time) { + log.Printf("Checking %s/%s since %s for %d expected logs", logGroup, logStream, since.UTC().Format(time.RFC3339), len(logLines)) + cwlClient, clientContext, err := getCloudWatchLogsClient() + if err != nil { + t.Fatalf("Error occurred while creating CloudWatch Logs SDK client: %v", err.Error()) + } + + sinceMs := since.UnixNano() / 1e6 // convert to millisecond timestamp + + // https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_GetLogEvents.html + // GetLogEvents can return an empty result while still having more log events on a subsequent page, + // so rather than expecting all the events to show up in one GetLogEvents API call, we need to paginate. + params := &cloudwatchlogs.GetLogEventsInput{ + LogGroupName: aws.String(logGroup), + LogStreamName: aws.String(logStream), + StartTime: aws.Int64(sinceMs), + StartFromHead: aws.Bool(true), // read from the beginning + } + + foundLogs := make([]string, 0) + var output *cloudwatchlogs.GetLogEventsOutput + var nextToken *string + + for { + if nextToken != nil { + params.NextToken = nextToken + } + output, err = cwlClient.GetLogEvents(*clientContext, params) + + if err != nil { + t.Fatalf("Error occurred while getting log events: %v", err.Error()) + } + + for _, e := range output.Events { + foundLogs = append(foundLogs, *e.Message) + } + + if nextToken != nil && output.NextForwardToken != nil && *output.NextForwardToken == *nextToken { + // From the docs: If you have reached the end of the stream, it returns the same token you passed in. + log.Printf("Done paginating log events for %s/%s and found %d logs", logGroup, logStream, len(foundLogs)) + break + } + + nextToken = output.NextForwardToken + } + + // Validate that each of the logs are found, in order and in full. + assert.Len(t, foundLogs, len(logLines)) + for i := 0; i < len(logLines); i++ { + expected := strings.ReplaceAll(logLines[i], "'", "\"") + actual := strings.ReplaceAll(foundLogs[i], "'", "\"") + assert.Equal(t, expected, actual) + } +} + // getCloudWatchLogsClient returns a singleton SDK client for interfacing with CloudWatch Logs func getCloudWatchLogsClient() (*cloudwatchlogs.Client, *context.Context, error) { if cwl == nil { From f28965ecf43ee446e97657d85f958fe1c42608ac Mon Sep 17 00:00:00 2001 From: Andrew Huynh Date: Fri, 29 Apr 2022 10:41:39 -0400 Subject: [PATCH 09/19] code cleanup --- plugins/inputs/logfile/logfile.go | 9 ------- plugins/inputs/logfile/logfile_test.go | 36 +++----------------------- plugins/inputs/logfile/tailersrc.go | 7 ----- 3 files changed, 3 insertions(+), 49 deletions(-) diff --git a/plugins/inputs/logfile/logfile.go b/plugins/inputs/logfile/logfile.go index 752d97108f..126f88ee2d 100644 --- a/plugins/inputs/logfile/logfile.go +++ b/plugins/inputs/logfile/logfile.go @@ -22,8 +22,6 @@ import ( "github.com/influxdata/telegraf/plugins/inputs" ) -var offsetThreshold int64 = 1024 * 1024 // PutLogEvents API maxes out at 1mb for a payload - type LogFile struct { //array of file config for file to be monitored. FileConfig []FileConfig `toml:"file_config"` @@ -345,13 +343,6 @@ func (t *LogFile) restoreState(filename string) (int64, error) { return 0, err } - // https://github.com/aws/amazon-cloudwatch-agent/issues/447 - // try to not drop the first log in a log file when rotated by reasoning that if the - //if offset < offsetThreshold { - // t.Log.Debugf("Offset %d is less than allowed %d for publishing logs. Reset to zero offset", offset, offsetThreshold) - // offset = 0 - //} - t.Log.Infof("Reading from offset %v in %s", offset, filename) return offset, nil diff --git a/plugins/inputs/logfile/logfile_test.go b/plugins/inputs/logfile/logfile_test.go index c662cd2086..279a674c52 100644 --- a/plugins/inputs/logfile/logfile_test.go +++ b/plugins/inputs/logfile/logfile_test.go @@ -198,34 +198,7 @@ func TestRestoreState(t *testing.T) { logFilePath := "/tmp/logfile.log" logFileStateFileName := "_tmp_logfile.log" - offset := offsetThreshold + 1 - err = ioutil.WriteFile( - tmpfolder+string(filepath.Separator)+logFileStateFileName, - []byte(strconv.FormatInt(offset, 10)+"\n"+logFilePath), - os.ModePerm) - require.NoError(t, err) - - tt := NewLogFile() - tt.Log = TestLogger{t} - tt.FileStateFolder = tmpfolder - roffset, err := tt.restoreState(logFilePath) - assert.Equal(t, offset, roffset, fmt.Sprintf("The actual offset is %d, different from the expected offset %d.", roffset, offset)) - tt.Stop() -} - -// TestRestoreStateReadFromBeginning ensures that if the offset stored in the state file -// is less than the defined offsetThreshold, we read from the beginning instead of upholding -// the stored offset. https://github.com/aws/amazon-cloudwatch-agent/issues/447 -func TestRestoreStateReadFromBeginning(t *testing.T) { - multilineWaitPeriod = 10 * time.Millisecond - tmpfolder, err := ioutil.TempDir("", "") - require.NoError(t, err) - defer os.RemoveAll(tmpfolder) - - logFilePath := "/tmp/logfile.log" - logFileStateFileName := "_tmp_logfile.log" - - offset := offsetThreshold + 1 + offset := int64(9323) err = ioutil.WriteFile( tmpfolder+string(filepath.Separator)+logFileStateFileName, []byte(strconv.FormatInt(offset, 10)+"\n"+logFilePath), @@ -645,7 +618,6 @@ func TestLogsFileTruncate(t *testing.T) { func TestLogsFileWithOffset(t *testing.T) { multilineWaitPeriod = 10 * time.Millisecond - offsetThreshold = 10 logEntryString := "xxxxxxxxxxContentAfterOffset" tmpfile, err := createTempFile("", "") @@ -696,7 +668,6 @@ func TestLogsFileWithOffset(t *testing.T) { func TestLogsFileWithInvalidOffset(t *testing.T) { multilineWaitPeriod = 10 * time.Millisecond - offsetThreshold = 10 logEntryString := "xxxxxxxxxxContentAfterOffset" tmpfile, err := createTempFile("", "") @@ -745,11 +716,10 @@ func TestLogsFileWithInvalidOffset(t *testing.T) { // TestLogsFileRecreate verifies that if a LogSrc matching a LogConfig is detected, // We only receive log lines beginning at the offset specified in the corresponding state-file. -// And if the file happens to get deleted and recreated we expect to receive log lines beginning -// at that same offset in the state file. +// And if the file happens to get deleted and recreated we expect to receive log lines +// from the beginning of the file. See https://github.com/aws/amazon-cloudwatch-agent/issues/447 func TestLogsFileRecreate(t *testing.T) { multilineWaitPeriod = 10 * time.Millisecond - offsetThreshold = 10 logEntryString := "xxxxxxxxxxContentAfterOffset" expectedContent := "ContentAfterOffset" diff --git a/plugins/inputs/logfile/tailersrc.go b/plugins/inputs/logfile/tailersrc.go index 4739258dcc..51c0f44166 100644 --- a/plugins/inputs/logfile/tailersrc.go +++ b/plugins/inputs/logfile/tailersrc.go @@ -281,13 +281,6 @@ func (ts *tailerSrc) cleanUp() { clf() } - //// delete state file if log file was closed - //select { - //case <-ts.tailer.FileDeletedCh: - // os.Remove(ts.stateFilePath) - //default: - //} - if ts.outputFn != nil { ts.outputFn(nil) // inform logs agent the tailer src's exit, to stop runSrcToDest } From 3c9a4cdd98a2acc59bb7e85284ef576641262456 Mon Sep 17 00:00:00 2001 From: Andrew Huynh Date: Fri, 29 Apr 2022 10:55:14 -0400 Subject: [PATCH 10/19] code cleanup --- plugins/inputs/logfile/logfile.go | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/plugins/inputs/logfile/logfile.go b/plugins/inputs/logfile/logfile.go index 126f88ee2d..e1c7f48fbf 100644 --- a/plugins/inputs/logfile/logfile.go +++ b/plugins/inputs/logfile/logfile.go @@ -250,17 +250,6 @@ func (t *LogFile) FindLogSrc() []logs.LogSrc { } }(src)) - //src.AddCleanUpFn(func(ts *tailerSrc) func() { - // return func() { - // select { - // case closed := <-ts.tailer.FileDeletedCh: - // if closed { - // os.Remove(ts.stateFilePath) - // } - // default: - // } - // } - //}(src)) srcs = append(srcs, src) From fe79755f3ee0df8ad4f082e8637c2f2bbcfe4bb1 Mon Sep 17 00:00:00 2001 From: Andrew Huynh Date: Fri, 29 Apr 2022 11:06:29 -0400 Subject: [PATCH 11/19] debug for windows test failure --- plugins/inputs/logfile/tailersrc.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/plugins/inputs/logfile/tailersrc.go b/plugins/inputs/logfile/tailersrc.go index 51c0f44166..8220f25232 100644 --- a/plugins/inputs/logfile/tailersrc.go +++ b/plugins/inputs/logfile/tailersrc.go @@ -309,7 +309,10 @@ func (ts *tailerSrc) runSaveState() { lastSavedOffset = offset case <-ts.tailer.FileDeletedCh: log.Printf("W! [logfile] deleting state file %s", ts.stateFilePath) - os.Remove(ts.stateFilePath) + err := os.Remove(ts.stateFilePath) + if err != nil { + log.Printf("E! [logfile] error occurred deleting state file %s: %v", ts.stateFilePath, err) + } return case <-ts.done: err := ts.saveState(offset.offset) From 0cdd0e6f4cfd2d5a134b50e4efbd255227e454ef Mon Sep 17 00:00:00 2001 From: Andrew Huynh Date: Fri, 29 Apr 2022 11:18:58 -0400 Subject: [PATCH 12/19] account for Windows file management --- plugins/inputs/logfile/tailersrc.go | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/plugins/inputs/logfile/tailersrc.go b/plugins/inputs/logfile/tailersrc.go index 8220f25232..6da8330a1c 100644 --- a/plugins/inputs/logfile/tailersrc.go +++ b/plugins/inputs/logfile/tailersrc.go @@ -18,8 +18,10 @@ import ( ) const ( - stateFileMode = 0644 - bufferLimit = 50 + stateFileMode = 0644 + bufferLimit = 50 + fileDeleteMaxRetries = 5 + fileDeleteWaitPeriod = 10 * time.Millisecond ) var ( @@ -290,6 +292,8 @@ func (ts *tailerSrc) runSaveState() { t := time.NewTicker(100 * time.Millisecond) defer t.Stop() + waitDuration := fileDeleteWaitPeriod + var offset, lastSavedOffset fileOffset for { select { @@ -309,10 +313,18 @@ func (ts *tailerSrc) runSaveState() { lastSavedOffset = offset case <-ts.tailer.FileDeletedCh: log.Printf("W! [logfile] deleting state file %s", ts.stateFilePath) - err := os.Remove(ts.stateFilePath) - if err != nil { - log.Printf("E! [logfile] error occurred deleting state file %s: %v", ts.stateFilePath, err) + for i := 0; i < fileDeleteMaxRetries; i++ { + err := os.Remove(ts.stateFilePath) + if err == nil { + return + } + log.Printf("W! [logfile] error occurred deleting state file %s: %v", ts.stateFilePath, err) + time.Sleep(waitDuration) + + waitDuration *= 2 } + // reaching here means we exhausted retries on deleting the state file + log.Printf("E! [logfile] failed to delete state file %s", ts.stateFilePath) return case <-ts.done: err := ts.saveState(offset.offset) From ce7b1b1dfe90c7bc078fb878d93b8b7528e23299 Mon Sep 17 00:00:00 2001 From: Andrew Huynh Date: Fri, 29 Apr 2022 12:04:38 -0400 Subject: [PATCH 13/19] add more debug --- plugins/inputs/logfile/logfile_test.go | 2 +- plugins/inputs/logfile/tailersrc.go | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/plugins/inputs/logfile/logfile_test.go b/plugins/inputs/logfile/logfile_test.go index 279a674c52..d8be4d4a07 100644 --- a/plugins/inputs/logfile/logfile_test.go +++ b/plugins/inputs/logfile/logfile_test.go @@ -805,7 +805,7 @@ func TestLogsFileRecreate(t *testing.T) { // the tailer should start from the beginning. e = <-evts if e.Message() != logEntryString { - t.Errorf("Wrong log found after file replacement: \n% x\nExpecting:\n% x\n", e.Message(), expectedContent) + t.Errorf("Wrong log found after file replacement: \n% x\nExpecting:\n% x\n", e.Message(), logEntryString) } lsrc.Stop() diff --git a/plugins/inputs/logfile/tailersrc.go b/plugins/inputs/logfile/tailersrc.go index 6da8330a1c..45b8094dd2 100644 --- a/plugins/inputs/logfile/tailersrc.go +++ b/plugins/inputs/logfile/tailersrc.go @@ -305,6 +305,7 @@ func (ts *tailerSrc) runSaveState() { if offset == lastSavedOffset { continue } + log.Printf("E! [logfile] saving to state file because of ticker") err := ts.saveState(offset.offset) if err != nil { log.Printf("E! [logfile] Error happened when saving file state %s to file state folder %s: %v", ts.tailer.Filename, ts.stateFilePath, err) @@ -327,6 +328,7 @@ func (ts *tailerSrc) runSaveState() { log.Printf("E! [logfile] failed to delete state file %s", ts.stateFilePath) return case <-ts.done: + log.Printf("E! [logfile] saving to state file because channel closed") err := ts.saveState(offset.offset) if err != nil { log.Printf("E! [logfile] Error happened during final file state saving of logfile %s to file state folder %s, duplicate log maybe sent at next start: %v", ts.tailer.Filename, ts.stateFilePath, err) From 7cac8c1fcc97c537ca14fc0aa6b25a2796fc502a Mon Sep 17 00:00:00 2001 From: Andrew Huynh Date: Fri, 29 Apr 2022 12:18:10 -0400 Subject: [PATCH 14/19] update state file permissions to match ioutil --- plugins/inputs/logfile/logfile_test.go | 2 +- plugins/inputs/logfile/tailersrc.go | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/plugins/inputs/logfile/logfile_test.go b/plugins/inputs/logfile/logfile_test.go index d8be4d4a07..ffa40ae832 100644 --- a/plugins/inputs/logfile/logfile_test.go +++ b/plugins/inputs/logfile/logfile_test.go @@ -735,7 +735,7 @@ func TestLogsFileRecreate(t *testing.T) { defer os.Remove(stateDir) stateFileName := filepath.Join(stateDir, escapeFilePath(tmpfile.Name())) - stateFile, err := os.OpenFile(stateFileName, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0600) + stateFile, err := os.OpenFile(stateFileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) require.NoError(t, err) _, err = stateFile.WriteString("10") defer os.Remove(stateFileName) diff --git a/plugins/inputs/logfile/tailersrc.go b/plugins/inputs/logfile/tailersrc.go index 45b8094dd2..6da8330a1c 100644 --- a/plugins/inputs/logfile/tailersrc.go +++ b/plugins/inputs/logfile/tailersrc.go @@ -305,7 +305,6 @@ func (ts *tailerSrc) runSaveState() { if offset == lastSavedOffset { continue } - log.Printf("E! [logfile] saving to state file because of ticker") err := ts.saveState(offset.offset) if err != nil { log.Printf("E! [logfile] Error happened when saving file state %s to file state folder %s: %v", ts.tailer.Filename, ts.stateFilePath, err) @@ -328,7 +327,6 @@ func (ts *tailerSrc) runSaveState() { log.Printf("E! [logfile] failed to delete state file %s", ts.stateFilePath) return case <-ts.done: - log.Printf("E! [logfile] saving to state file because channel closed") err := ts.saveState(offset.offset) if err != nil { log.Printf("E! [logfile] Error happened during final file state saving of logfile %s to file state folder %s, duplicate log maybe sent at next start: %v", ts.tailer.Filename, ts.stateFilePath, err) From ed18fba35c9b868094bd91ef54bb12ea983cdff7 Mon Sep 17 00:00:00 2001 From: Andrew Huynh Date: Fri, 29 Apr 2022 12:27:07 -0400 Subject: [PATCH 15/19] Revert "update state file permissions to match ioutil" This reverts commit 7cac8c1fcc97c537ca14fc0aa6b25a2796fc502a. --- plugins/inputs/logfile/logfile_test.go | 2 +- plugins/inputs/logfile/tailersrc.go | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/plugins/inputs/logfile/logfile_test.go b/plugins/inputs/logfile/logfile_test.go index ffa40ae832..d8be4d4a07 100644 --- a/plugins/inputs/logfile/logfile_test.go +++ b/plugins/inputs/logfile/logfile_test.go @@ -735,7 +735,7 @@ func TestLogsFileRecreate(t *testing.T) { defer os.Remove(stateDir) stateFileName := filepath.Join(stateDir, escapeFilePath(tmpfile.Name())) - stateFile, err := os.OpenFile(stateFileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) + stateFile, err := os.OpenFile(stateFileName, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0600) require.NoError(t, err) _, err = stateFile.WriteString("10") defer os.Remove(stateFileName) diff --git a/plugins/inputs/logfile/tailersrc.go b/plugins/inputs/logfile/tailersrc.go index 6da8330a1c..45b8094dd2 100644 --- a/plugins/inputs/logfile/tailersrc.go +++ b/plugins/inputs/logfile/tailersrc.go @@ -305,6 +305,7 @@ func (ts *tailerSrc) runSaveState() { if offset == lastSavedOffset { continue } + log.Printf("E! [logfile] saving to state file because of ticker") err := ts.saveState(offset.offset) if err != nil { log.Printf("E! [logfile] Error happened when saving file state %s to file state folder %s: %v", ts.tailer.Filename, ts.stateFilePath, err) @@ -327,6 +328,7 @@ func (ts *tailerSrc) runSaveState() { log.Printf("E! [logfile] failed to delete state file %s", ts.stateFilePath) return case <-ts.done: + log.Printf("E! [logfile] saving to state file because channel closed") err := ts.saveState(offset.offset) if err != nil { log.Printf("E! [logfile] Error happened during final file state saving of logfile %s to file state folder %s, duplicate log maybe sent at next start: %v", ts.tailer.Filename, ts.stateFilePath, err) From 2ded347675d38bdd727885bc6c8830dd50355f2f Mon Sep 17 00:00:00 2001 From: Andrew Huynh Date: Fri, 29 Apr 2022 12:29:31 -0400 Subject: [PATCH 16/19] close state file in test --- plugins/inputs/logfile/logfile_test.go | 5 ++++- plugins/inputs/logfile/tailersrc.go | 29 +++++++++++++------------- 2 files changed, 18 insertions(+), 16 deletions(-) diff --git a/plugins/inputs/logfile/logfile_test.go b/plugins/inputs/logfile/logfile_test.go index d8be4d4a07..42530cf6b2 100644 --- a/plugins/inputs/logfile/logfile_test.go +++ b/plugins/inputs/logfile/logfile_test.go @@ -735,9 +735,12 @@ func TestLogsFileRecreate(t *testing.T) { defer os.Remove(stateDir) stateFileName := filepath.Join(stateDir, escapeFilePath(tmpfile.Name())) - stateFile, err := os.OpenFile(stateFileName, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0600) + stateFile, err := os.OpenFile(stateFileName, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0644) require.NoError(t, err) _, err = stateFile.WriteString("10") + require.NoError(t, err) + err = stateFile.Close() + require.NoError(t, err) defer os.Remove(stateFileName) tt := NewLogFile() diff --git a/plugins/inputs/logfile/tailersrc.go b/plugins/inputs/logfile/tailersrc.go index 45b8094dd2..a3ecdfb982 100644 --- a/plugins/inputs/logfile/tailersrc.go +++ b/plugins/inputs/logfile/tailersrc.go @@ -292,7 +292,7 @@ func (ts *tailerSrc) runSaveState() { t := time.NewTicker(100 * time.Millisecond) defer t.Stop() - waitDuration := fileDeleteWaitPeriod + //waitDuration := fileDeleteWaitPeriod var offset, lastSavedOffset fileOffset for { @@ -305,7 +305,6 @@ func (ts *tailerSrc) runSaveState() { if offset == lastSavedOffset { continue } - log.Printf("E! [logfile] saving to state file because of ticker") err := ts.saveState(offset.offset) if err != nil { log.Printf("E! [logfile] Error happened when saving file state %s to file state folder %s: %v", ts.tailer.Filename, ts.stateFilePath, err) @@ -314,21 +313,21 @@ func (ts *tailerSrc) runSaveState() { lastSavedOffset = offset case <-ts.tailer.FileDeletedCh: log.Printf("W! [logfile] deleting state file %s", ts.stateFilePath) - for i := 0; i < fileDeleteMaxRetries; i++ { - err := os.Remove(ts.stateFilePath) - if err == nil { - return - } - log.Printf("W! [logfile] error occurred deleting state file %s: %v", ts.stateFilePath, err) - time.Sleep(waitDuration) - - waitDuration *= 2 - } - // reaching here means we exhausted retries on deleting the state file - log.Printf("E! [logfile] failed to delete state file %s", ts.stateFilePath) + os.Remove(ts.stateFilePath) + //for i := 0; i < fileDeleteMaxRetries; i++ { + // err := os.Remove(ts.stateFilePath) + // if err == nil { + // return + // } + // log.Printf("W! [logfile] error occurred deleting state file %s: %v", ts.stateFilePath, err) + // time.Sleep(waitDuration) + // + // waitDuration *= 2 + //} + //// reaching here means we exhausted retries on deleting the state file + //log.Printf("E! [logfile] failed to delete state file %s", ts.stateFilePath) return case <-ts.done: - log.Printf("E! [logfile] saving to state file because channel closed") err := ts.saveState(offset.offset) if err != nil { log.Printf("E! [logfile] Error happened during final file state saving of logfile %s to file state folder %s, duplicate log maybe sent at next start: %v", ts.tailer.Filename, ts.stateFilePath, err) From 363b4bf79fd21f646aa6f4a057ac02211bf111dd Mon Sep 17 00:00:00 2001 From: Andrew Huynh Date: Fri, 29 Apr 2022 12:41:48 -0400 Subject: [PATCH 17/19] code cleanup --- plugins/inputs/logfile/tailersrc.go | 25 ++++++------------------- 1 file changed, 6 insertions(+), 19 deletions(-) diff --git a/plugins/inputs/logfile/tailersrc.go b/plugins/inputs/logfile/tailersrc.go index a3ecdfb982..178f824041 100644 --- a/plugins/inputs/logfile/tailersrc.go +++ b/plugins/inputs/logfile/tailersrc.go @@ -18,10 +18,8 @@ import ( ) const ( - stateFileMode = 0644 - bufferLimit = 50 - fileDeleteMaxRetries = 5 - fileDeleteWaitPeriod = 10 * time.Millisecond + stateFileMode = 0644 + bufferLimit = 50 ) var ( @@ -292,8 +290,6 @@ func (ts *tailerSrc) runSaveState() { t := time.NewTicker(100 * time.Millisecond) defer t.Stop() - //waitDuration := fileDeleteWaitPeriod - var offset, lastSavedOffset fileOffset for { select { @@ -313,19 +309,10 @@ func (ts *tailerSrc) runSaveState() { lastSavedOffset = offset case <-ts.tailer.FileDeletedCh: log.Printf("W! [logfile] deleting state file %s", ts.stateFilePath) - os.Remove(ts.stateFilePath) - //for i := 0; i < fileDeleteMaxRetries; i++ { - // err := os.Remove(ts.stateFilePath) - // if err == nil { - // return - // } - // log.Printf("W! [logfile] error occurred deleting state file %s: %v", ts.stateFilePath, err) - // time.Sleep(waitDuration) - // - // waitDuration *= 2 - //} - //// reaching here means we exhausted retries on deleting the state file - //log.Printf("E! [logfile] failed to delete state file %s", ts.stateFilePath) + err := os.Remove(ts.stateFilePath) + if err != nil { + log.Printf("E! [logfile] Error happened while deleting state file %s on cleanup", ts.stateFilePath) + } return case <-ts.done: err := ts.saveState(offset.offset) From 817cd326ed743ebb609253e770376567fa66705b Mon Sep 17 00:00:00 2001 From: Andrew Huynh Date: Fri, 29 Apr 2022 16:09:44 -0400 Subject: [PATCH 18/19] update test description --- integration/test/cloudwatchlogs/publish_logs_test.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/integration/test/cloudwatchlogs/publish_logs_test.go b/integration/test/cloudwatchlogs/publish_logs_test.go index 78c5586dd8..a279e5a28d 100644 --- a/integration/test/cloudwatchlogs/publish_logs_test.go +++ b/integration/test/cloudwatchlogs/publish_logs_test.go @@ -49,6 +49,8 @@ var testParameters = []input{ }, } +// TestWriteLogsToCloudWatch writes N number of logs, and then validates that N logs +// are queryable from CloudWatch Logs func TestWriteLogsToCloudWatch(t *testing.T) { // this uses the {instance_id} placeholder in the agent configuration, // so we need to determine the host's instance ID for validation @@ -78,7 +80,12 @@ func TestWriteLogsToCloudWatch(t *testing.T) { } } -// Validate https://github.com/aws/amazon-cloudwatch-agent/issues/447 +// TestRotatingLogsDoesNotSkipLines validates https://github.com/aws/amazon-cloudwatch-agent/issues/447 +// The following should happen in the test: +// 1. A log line of size N should be written +// 2. The file should be rotated, and a new log line of size N should be written +// 3. The file should be rotated again, and a new log line of size GREATER THAN N should be written +// 4. All three log lines, in full, should be visible in CloudWatch Logs func TestRotatingLogsDoesNotSkipLines(t *testing.T) { cfgFilePath := "resources/config_log_rotated.json" From e30cc0d4f974ebf424bb9719ec7dec1ce7b177e0 Mon Sep 17 00:00:00 2001 From: Andrew Huynh Date: Mon, 2 May 2022 15:49:35 -0400 Subject: [PATCH 19/19] added comments --- integration/test/cloudwatchlogs/publish_logs_test.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/integration/test/cloudwatchlogs/publish_logs_test.go b/integration/test/cloudwatchlogs/publish_logs_test.go index a279e5a28d..cac789a2c6 100644 --- a/integration/test/cloudwatchlogs/publish_logs_test.go +++ b/integration/test/cloudwatchlogs/publish_logs_test.go @@ -110,6 +110,11 @@ func TestRotatingLogsDoesNotSkipLines(t *testing.T) { time.Sleep(agentRuntime) test.StopAgent() + // These expected log lines are created using resources/write_and_rotate_logs.py, + // which are taken directly from the repro case in https://github.com/aws/amazon-cloudwatch-agent/issues/447 + // logging.info(json.dumps({"Metric": "12345"*10})) + // logging.info(json.dumps({"Metric": "09876"*10})) + // logging.info({"Metric": "1234567890"*10}) lines := []string{ fmt.Sprintf("{\"Metric\": \"%s\"}", strings.Repeat("12345", 10)), fmt.Sprintf("{\"Metric\": \"%s\"}", strings.Repeat("09876", 10)),