Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete statefile when tailer terminates due to an error #457

Merged
merged 19 commits into from
May 3, 2022
4 changes: 3 additions & 1 deletion integration/terraform/ec2/linux/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -312,4 +312,6 @@ terraform destroy --auto-approve
6. make
7. aws-cli
8. CloudWatchAgentServerRole is attached
9. crontab
9. crontab
10. gcc
11. python3
8 changes: 8 additions & 0 deletions integration/test/agent_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,14 @@ func RunShellScript(path string, args ...string) {
}
}

func RunCommand(cmd string) {
out, err := exec.Command("bash", "-c", cmd).Output()

if err != nil {
log.Fatalf("Error occurred when executing %s: %s | %s", cmd, err.Error(), string(out))
}
}

func ReplaceLocalStackHostName(pathIn string) {
out, err := exec.Command("bash", "-c", "sed -i 's/localhost.localstack.cloud/'\"$LOCAL_STACK_HOST_NAME\"'/g' "+pathIn).Output()

Expand Down
52 changes: 47 additions & 5 deletions integration/test/cloudwatchlogs/publish_logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/aws/amazon-cloudwatch-agent/integration/test"
"log"
"os"
"strings"

"testing"
"time"
Expand Down Expand Up @@ -48,13 +49,15 @@ var testParameters = []input{
},
}

// TestWriteLogsToCloudWatch writes N number of logs, and then validates that N logs
// are queryable from CloudWatch Logs
func TestWriteLogsToCloudWatch(t *testing.T) {
// this uses the {instance_id} placeholder in the agent configuration,
// so we need to determine the host's instance ID for validation
instanceId := test.GetInstanceId()
log.Printf("Found instance id %s", instanceId)

defer cleanUp(instanceId)
defer test.DeleteLogGroupAndStream(instanceId, instanceId)

for _, param := range testParameters {
t.Run(param.testName, func(t *testing.T) {
Expand All @@ -77,6 +80,49 @@ func TestWriteLogsToCloudWatch(t *testing.T) {
}
}

// TestRotatingLogsDoesNotSkipLines validates https://github.com/aws/amazon-cloudwatch-agent/issues/447
// The following should happen in the test:
// 1. A log line of size N should be written
// 2. The file should be rotated, and a new log line of size N should be written
// 3. The file should be rotated again, and a new log line of size GREATER THAN N should be written
// 4. All three log lines, in full, should be visible in CloudWatch Logs
func TestRotatingLogsDoesNotSkipLines(t *testing.T) {
cfgFilePath := "resources/config_log_rotated.json"

instanceId := test.GetInstanceId()
log.Printf("Found instance id %s", instanceId)
logGroup := instanceId
logStream := instanceId + "Rotated"

defer test.DeleteLogGroupAndStream(logGroup, logStream)

start := time.Now()
test.CopyFile(cfgFilePath, configOutputPath)

test.StartAgent(configOutputPath)

// ensure that there is enough time from the "start" time and the first log line,
// so we don't miss it in the GetLogEvents call
time.Sleep(agentRuntime)
t.Log("Writing logs and rotating")
// execute the script used in the repro case
test.RunCommand("/usr/bin/python3 resources/write_and_rotate_logs.py")
Copy link
Contributor

@adam-mateen adam-mateen Apr 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that the failure is better understood, do you think you could write Golang code instead of calling the python script?
I am fine with keeping the Python script, but I just wanted to check.

Or maybe it is better to keep the python logger code since it test using something (the logger) that users will actually use. Compared to custom Golang code that we write just to mimic the behavior.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm kind of on the fence about it. I can try to spend some time today to try doing it natively in Go again, but not at a higher priority than fixing this windows unit test failure. What's annoying is I manually tested on Windows and of course there's no issue then 🙄

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried again briefly and didn't get it right. Pasting the function I wrote for deleting/recreating the log file in case there's something obvious I am missing. I just run this in a loop for the different log lines, and sleep a decent amount of time in between each write. I ran integration tests on my fork and it couldn't even find the log stream, and I don't think I want to invest much more time with this. I think framing it with "something that users will actually use" is good enough of a excuse justification to me to keep the python script.

func foo(t *testing.T, filePath, logLine string) {
	os.Remove(filePath) // don't care if it existed before. delete it now
	f, err := os.Create(filePath)
	if err != nil {
		t.Fatalf("Error occurred creating log file for writing: %v", err)
	}
	defer f.Close()

	log.Printf("Writing %s to %s", logLine, filePath)
	_, err = f.WriteString(logLine)
	if err != nil {
		t.Fatalf("Error occurred while writing to %s: %v", filePath, err)
	}
}

time.Sleep(agentRuntime)
test.StopAgent()

// These expected log lines are created using resources/write_and_rotate_logs.py,
// which are taken directly from the repro case in https://github.com/aws/amazon-cloudwatch-agent/issues/447
// logging.info(json.dumps({"Metric": "12345"*10}))
// logging.info(json.dumps({"Metric": "09876"*10}))
// logging.info({"Metric": "1234567890"*10})
lines := []string{
SaxyPandaBear marked this conversation as resolved.
Show resolved Hide resolved
fmt.Sprintf("{\"Metric\": \"%s\"}", strings.Repeat("12345", 10)),
fmt.Sprintf("{\"Metric\": \"%s\"}", strings.Repeat("09876", 10)),
fmt.Sprintf("{\"Metric\": \"%s\"}", strings.Repeat("1234567890", 10)),
}
test.ValidateLogsInOrder(t, logGroup, logStream, lines, start)
}

func writeLogs(t *testing.T, filePath string, iterations int) {
f, err := os.Create(filePath)
if err != nil {
Expand All @@ -99,7 +145,3 @@ func writeLogs(t *testing.T, filePath string, iterations int) {
time.Sleep(1 * time.Millisecond)
}
}

func cleanUp(instanceId string) {
test.DeleteLogGroupAndStream(instanceId, instanceId)
}
19 changes: 19 additions & 0 deletions integration/test/cloudwatchlogs/resources/config_log_rotated.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"agent": {
"run_as_user": "root"
},
"logs": {
"logs_collected": {
"files": {
"collect_list": [
{
"file_path": "/tmp/rotate_me.log*",
"log_group_name": "{instance_id}",
"log_stream_name": "{instance_id}Rotated",
"timezone": "UTC"
}
]
}
}
}
}
28 changes: 28 additions & 0 deletions integration/test/cloudwatchlogs/resources/write_and_rotate_logs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
"""
Lifted this from https://github.com/aws/amazon-cloudwatch-agent/issues/447
because I was not able to adequately reproduce the issue natively in Go,
directly in the integration test code.
"""
import json
import logging
import time
from logging.handlers import TimedRotatingFileHandler

# get root logger
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# rotate our log file every 10 seconds
handler = TimedRotatingFileHandler("/tmp/rotate_me.log", when="S", interval=10)
logger.addHandler(handler)

# log a message
logging.info(json.dumps({"Metric": "12345"*10}))
# sleep so that file will rotate upon next log message
time.sleep(15)
# log another message (this one will not appear since byte length of message == byte length of old log file)
logging.info(json.dumps({"Metric": "09876"*10}))
# sleep again so that file will rotate upon next log message
time.sleep(15)
# this message will be partially written
logging.info({"Metric": "1234567890"*10})
60 changes: 60 additions & 0 deletions integration/test/cwl_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"context"
"errors"
"log"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -104,6 +105,65 @@ func DeleteLogGroupAndStream(logGroupName, logStreamName string) {
}
}

// ValidateLogsInOrder takes a log group, log stream, a list of specific log lines and a timestamp.
// It should query the given log stream for log events, and then confirm that the log lines that are
// returned match the expected log lines. This also sanitizes the log lines from both the output and
// the expected lines input to ensure that they don't diverge in JSON representation (" vs ')
func ValidateLogsInOrder(t *testing.T, logGroup, logStream string, logLines []string, since time.Time) {
log.Printf("Checking %s/%s since %s for %d expected logs", logGroup, logStream, since.UTC().Format(time.RFC3339), len(logLines))
cwlClient, clientContext, err := getCloudWatchLogsClient()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to return the error to begin with? We can still put

if err != nil {
		t.Fatalf("Error occurred while creating CloudWatch Logs SDK client: %v", err.Error())
	}

inside the getCloudWatchLogsClient unless you want to change the error handling stuff here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doing that means changing the function to accept a *testing.T parameter, like

func getCloudwatchLogsClient(t *testing.T) {}

I am indifferent on how we do it. The end result is the same. I just didn't see much reason to pass the testing struct down another level.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fine in passing down a struct at a deeper level since it would reduce code and main reason was: Do we let each function handles custom respond to the error? Since it is not that's why I would prefer passing down; however, its fine for you to keep it since the end result is the same.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I disagree with the current approach for the CloudWatch metrics client creation in agent_util.GetCWClient() because it uses log.Fatal which makes it harder to differentiate that from the individual test executions.

if err != nil {
t.Fatalf("Error occurred while creating CloudWatch Logs SDK client: %v", err.Error())
}

sinceMs := since.UnixNano() / 1e6 // convert to millisecond timestamp

// https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_GetLogEvents.html
// GetLogEvents can return an empty result while still having more log events on a subsequent page,
// so rather than expecting all the events to show up in one GetLogEvents API call, we need to paginate.
params := &cloudwatchlogs.GetLogEventsInput{
LogGroupName: aws.String(logGroup),
LogStreamName: aws.String(logStream),
StartTime: aws.Int64(sinceMs),
StartFromHead: aws.Bool(true), // read from the beginning
}

foundLogs := make([]string, 0)
var output *cloudwatchlogs.GetLogEventsOutput
var nextToken *string

for {
if nextToken != nil {
params.NextToken = nextToken
}
output, err = cwlClient.GetLogEvents(*clientContext, params)

if err != nil {
t.Fatalf("Error occurred while getting log events: %v", err.Error())
}

for _, e := range output.Events {
foundLogs = append(foundLogs, *e.Message)
}

if nextToken != nil && output.NextForwardToken != nil && *output.NextForwardToken == *nextToken {
// From the docs: If you have reached the end of the stream, it returns the same token you passed in.
log.Printf("Done paginating log events for %s/%s and found %d logs", logGroup, logStream, len(foundLogs))
break
}

nextToken = output.NextForwardToken
}

// Validate that each of the logs are found, in order and in full.
assert.Len(t, foundLogs, len(logLines))
for i := 0; i < len(logLines); i++ {
expected := strings.ReplaceAll(logLines[i], "'", "\"")
actual := strings.ReplaceAll(foundLogs[i], "'", "\"")
assert.Equal(t, expected, actual)
}
}

// getCloudWatchLogsClient returns a singleton SDK client for interfacing with CloudWatch Logs
func getCloudWatchLogsClient() (*cloudwatchlogs.Client, *context.Context, error) {
if cwl == nil {
Expand Down
16 changes: 11 additions & 5 deletions plugins/inputs/logfile/logfile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,10 +716,11 @@ func TestLogsFileWithInvalidOffset(t *testing.T) {

// TestLogsFileRecreate verifies that if a LogSrc matching a LogConfig is detected,
// We only receive log lines beginning at the offset specified in the corresponding state-file.
// And if the file happens to get deleted and recreated we expect to receive log lines beginning
// at that same offset in the state file.
// And if the file happens to get deleted and recreated we expect to receive log lines
// from the beginning of the file. See https://github.com/aws/amazon-cloudwatch-agent/issues/447
func TestLogsFileRecreate(t *testing.T) {
multilineWaitPeriod = 10 * time.Millisecond

logEntryString := "xxxxxxxxxxContentAfterOffset"
expectedContent := "ContentAfterOffset"

Expand All @@ -734,9 +735,12 @@ func TestLogsFileRecreate(t *testing.T) {
defer os.Remove(stateDir)

stateFileName := filepath.Join(stateDir, escapeFilePath(tmpfile.Name()))
stateFile, err := os.OpenFile(stateFileName, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0600)
stateFile, err := os.OpenFile(stateFileName, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0644)
SaxyPandaBear marked this conversation as resolved.
Show resolved Hide resolved
require.NoError(t, err)
_, err = stateFile.WriteString("10")
require.NoError(t, err)
err = stateFile.Close()
SaxyPandaBear marked this conversation as resolved.
Show resolved Hide resolved
require.NoError(t, err)
defer os.Remove(stateFileName)

tt := NewLogFile()
Expand Down Expand Up @@ -800,9 +804,11 @@ func TestLogsFileRecreate(t *testing.T) {
}
})

// after the file gets deleted, the state file should be deleted too, so
// the tailer should start from the beginning.
e = <-evts
if e.Message() != expectedContent {
t.Errorf("Wrong log found after file replacement: \n% x\nExpecting:\n% x\n", e.Message(), expectedContent)
if e.Message() != logEntryString {
t.Errorf("Wrong log found after file replacement: \n% x\nExpecting:\n% x\n", e.Message(), logEntryString)
}

lsrc.Stop()
Expand Down
10 changes: 7 additions & 3 deletions plugins/inputs/logfile/tail/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ type Tail struct {
dropCnt int

lk sync.Mutex

FileDeletedCh chan bool
}

// TailFile begins tailing the file. Output stream is made available
Expand All @@ -95,9 +97,10 @@ func TailFile(filename string, config Config) (*Tail, error) {
}

t := &Tail{
Filename: filename,
Lines: make(chan *Line),
Config: config,
Filename: filename,
Lines: make(chan *Line),
Config: config,
FileDeletedCh: make(chan bool),
}

// when Logger was not specified in config, create new one
Expand Down Expand Up @@ -385,6 +388,7 @@ func (tail *Tail) tailFileSync() {
err := tail.waitForChanges()
if err != nil {
if err == ErrDeletedNotReOpen {
close(tail.FileDeletedCh)
for {
line, errReadLine := tail.readLine()
if errReadLine == nil {
Expand Down
8 changes: 8 additions & 0 deletions plugins/inputs/logfile/tailersrc.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ func (ts *tailerSrc) cleanUp() {
for _, clf := range ts.cleanUpFns {
clf()
}

if ts.outputFn != nil {
ts.outputFn(nil) // inform logs agent the tailer src's exit, to stop runSrcToDest
}
Expand All @@ -306,6 +307,13 @@ func (ts *tailerSrc) runSaveState() {
continue
}
lastSavedOffset = offset
case <-ts.tailer.FileDeletedCh:
SaxyPandaBear marked this conversation as resolved.
Show resolved Hide resolved
log.Printf("W! [logfile] deleting state file %s", ts.stateFilePath)
err := os.Remove(ts.stateFilePath)
if err != nil {
log.Printf("E! [logfile] Error happened while deleting state file %s on cleanup", ts.stateFilePath)
}
return
case <-ts.done:
err := ts.saveState(offset.offset)
if err != nil {
Expand Down