From 75f202a8de5e4366e6953529aae76d56c1b9b37d Mon Sep 17 00:00:00 2001 From: Adam <90734270+adam-mateen@users.noreply.github.com> Date: Fri, 4 Nov 2022 12:52:24 -0500 Subject: [PATCH 1/4] Add unit test for AtomicCounter. --- internal/atomiccounter/atomiccounter_test.go | 48 ++++++++++++++++++++ 1 file changed, 48 insertions(+) create mode 100644 internal/atomiccounter/atomiccounter_test.go diff --git a/internal/atomiccounter/atomiccounter_test.go b/internal/atomiccounter/atomiccounter_test.go new file mode 100644 index 0000000000..09fb50e803 --- /dev/null +++ b/internal/atomiccounter/atomiccounter_test.go @@ -0,0 +1,48 @@ +package atomiccounter + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestAtomicCounter(t *testing.T) { + x := NewAtomicCounter() + assert.Equal(t, int64(0), x.Get()) + + for i := int64(0); i < 1000; i++ { + assert.Equal(t, i, x.Get()) + x.Increment() + } + + for i := int64(1000 - 1); i >= 1000; i-- { + assert.Equal(t, i, x.Get()) + x.Decrement() + } +} + +// TestAtomicCounterParallel runs many goroutines to inc and dec the same +// amount of times so that the expected end result is counter == 0. +func TestAtomicCounterParallel(t *testing.T) { + x := NewAtomicCounter() + var wg sync.WaitGroup + + for i := 0; i < 100; i++ { + wg.Add(2) + go func() { + for j := 0; j < 100; j++ { + x.Increment() + } + wg.Done() + }() + go func () { + for k := 0; k < 100; k++ { + x.Decrement() + } + wg.Done() + }() + } + wg.Wait() + assert.Equal(t, int64(0), x.Get()) +} \ No newline at end of file From 433c56cf36e38ff8db6e59a14ca9c35c9b8ee16d Mon Sep 17 00:00:00 2001 From: Adam <90734270+adam-mateen@users.noreply.github.com> Date: Fri, 4 Nov 2022 12:54:06 -0500 Subject: [PATCH 2/4] Run go fmt. --- internal/atomiccounter/atomiccounter.go | 2 +- internal/atomiccounter/atomiccounter_test.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/atomiccounter/atomiccounter.go b/internal/atomiccounter/atomiccounter.go index 5939a0e530..a5ad4c875f 100644 --- a/internal/atomiccounter/atomiccounter.go +++ b/internal/atomiccounter/atomiccounter.go @@ -25,4 +25,4 @@ func (ac *AtomicCounter) Decrement() { // It is just for logging the current value. func (ac *AtomicCounter) Get() int64 { return ac.val -} \ No newline at end of file +} diff --git a/internal/atomiccounter/atomiccounter_test.go b/internal/atomiccounter/atomiccounter_test.go index 09fb50e803..533e69be58 100644 --- a/internal/atomiccounter/atomiccounter_test.go +++ b/internal/atomiccounter/atomiccounter_test.go @@ -36,7 +36,7 @@ func TestAtomicCounterParallel(t *testing.T) { } wg.Done() }() - go func () { + go func() { for k := 0; k < 100; k++ { x.Decrement() } @@ -45,4 +45,4 @@ func TestAtomicCounterParallel(t *testing.T) { } wg.Wait() assert.Equal(t, int64(0), x.Get()) -} \ No newline at end of file +} From cc8f93a649fdae87a4cb1b3abb5f6c2c2a30e317 Mon Sep 17 00:00:00 2001 From: Adam <90734270+adam-mateen@users.noreply.github.com> Date: Fri, 4 Nov 2022 13:18:04 -0500 Subject: [PATCH 3/4] Fix loop condition. --- internal/atomiccounter/atomiccounter_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/internal/atomiccounter/atomiccounter_test.go b/internal/atomiccounter/atomiccounter_test.go index 533e69be58..3f616b448c 100644 --- a/internal/atomiccounter/atomiccounter_test.go +++ b/internal/atomiccounter/atomiccounter_test.go @@ -7,6 +7,8 @@ import ( "github.com/stretchr/testify/assert" ) +// TestAtomicCounter increments then decrements the counter and verifies the +// value each time. func TestAtomicCounter(t *testing.T) { x := NewAtomicCounter() assert.Equal(t, int64(0), x.Get()) @@ -16,7 +18,7 @@ func TestAtomicCounter(t *testing.T) { x.Increment() } - for i := int64(1000 - 1); i >= 1000; i-- { + for i := int64(1000); i >= 0; i-- { assert.Equal(t, i, x.Get()) x.Decrement() } From 5e23dd10f787c1d92f2292475ce8925b71ed7082 Mon Sep 17 00:00:00 2001 From: Adam <90734270+adam-mateen@users.noreply.github.com> Date: Fri, 4 Nov 2022 14:09:25 -0500 Subject: [PATCH 4/4] Remove internal/atomiccounter. Use sync/atomic from Golang 1.19. --- internal/atomiccounter/atomiccounter.go | 28 ----------- internal/atomiccounter/atomiccounter_test.go | 50 -------------------- logs/logs.go | 2 +- plugins/inputs/logfile/tail/tail.go | 10 ++-- plugins/inputs/logfile/tail/tail_test.go | 10 ++-- plugins/inputs/logfile/tailersrc_test.go | 6 +-- 6 files changed, 14 insertions(+), 92 deletions(-) delete mode 100644 internal/atomiccounter/atomiccounter.go delete mode 100644 internal/atomiccounter/atomiccounter_test.go diff --git a/internal/atomiccounter/atomiccounter.go b/internal/atomiccounter/atomiccounter.go deleted file mode 100644 index a5ad4c875f..0000000000 --- a/internal/atomiccounter/atomiccounter.go +++ /dev/null @@ -1,28 +0,0 @@ -package atomiccounter - -import ( - "sync/atomic" -) - -type AtomicCounter struct { - val int64 -} - -// NewAtomicCounter returns a new counter with the default value of 0. -func NewAtomicCounter() AtomicCounter { - return AtomicCounter{} -} - -func (ac *AtomicCounter) Increment() { - atomic.AddInt64(&ac.val, 1) -} - -func (ac *AtomicCounter) Decrement() { - atomic.AddInt64(&ac.val, -1) -} - -// Get is not safe to use for synchronizing work between goroutines. -// It is just for logging the current value. -func (ac *AtomicCounter) Get() int64 { - return ac.val -} diff --git a/internal/atomiccounter/atomiccounter_test.go b/internal/atomiccounter/atomiccounter_test.go deleted file mode 100644 index 3f616b448c..0000000000 --- a/internal/atomiccounter/atomiccounter_test.go +++ /dev/null @@ -1,50 +0,0 @@ -package atomiccounter - -import ( - "sync" - "testing" - - "github.com/stretchr/testify/assert" -) - -// TestAtomicCounter increments then decrements the counter and verifies the -// value each time. -func TestAtomicCounter(t *testing.T) { - x := NewAtomicCounter() - assert.Equal(t, int64(0), x.Get()) - - for i := int64(0); i < 1000; i++ { - assert.Equal(t, i, x.Get()) - x.Increment() - } - - for i := int64(1000); i >= 0; i-- { - assert.Equal(t, i, x.Get()) - x.Decrement() - } -} - -// TestAtomicCounterParallel runs many goroutines to inc and dec the same -// amount of times so that the expected end result is counter == 0. -func TestAtomicCounterParallel(t *testing.T) { - x := NewAtomicCounter() - var wg sync.WaitGroup - - for i := 0; i < 100; i++ { - wg.Add(2) - go func() { - for j := 0; j < 100; j++ { - x.Increment() - } - wg.Done() - }() - go func() { - for k := 0; k < 100; k++ { - x.Decrement() - } - wg.Done() - }() - } - wg.Wait() - assert.Equal(t, int64(0), x.Get()) -} diff --git a/logs/logs.go b/logs/logs.go index 950fe88f0b..f228bf8b0e 100644 --- a/logs/logs.go +++ b/logs/logs.go @@ -98,7 +98,7 @@ func (l *LogAgent) Run(ctx context.Context) { for { select { case <-t.C: - log.Printf("D! [logagent] open file count, %v", tail.OpenFileCount.Get()) + log.Printf("D! [logagent] open file count, %v", tail.OpenFileCount.Load()) for _, c := range l.collections { srcs := c.FindLogSrc() for _, src := range srcs { diff --git a/plugins/inputs/logfile/tail/tail.go b/plugins/inputs/logfile/tail/tail.go index 0f1d82d924..bc498d2fc8 100644 --- a/plugins/inputs/logfile/tail/tail.go +++ b/plugins/inputs/logfile/tail/tail.go @@ -10,9 +10,9 @@ import ( "io" "os" "sync" + "sync/atomic" "time" - "github.com/aws/amazon-cloudwatch-agent/internal/atomiccounter" "github.com/aws/amazon-cloudwatch-agent/plugins/inputs/logfile/tail/watch" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/models" @@ -24,7 +24,7 @@ var ( ErrDeletedNotReOpen = errors.New("File was deleted, tail should now stop") exitOnDeletionCheckDuration = time.Minute exitOnDeletionWaitDuration = 5 * time.Minute - OpenFileCount = atomiccounter.NewAtomicCounter() + OpenFileCount atomic.Int64 ) type Line struct { @@ -122,7 +122,7 @@ func TailFile(filename string, config Config) (*Tail, error) { if err != nil { return nil, err } - OpenFileCount.Increment() + OpenFileCount.Add(1) } if !config.ReOpen { @@ -184,7 +184,7 @@ func (tail *Tail) closeFile() { if tail.file != nil { tail.file.Close() tail.file = nil - OpenFileCount.Decrement() + OpenFileCount.Add(-1) } } @@ -209,7 +209,7 @@ func (tail *Tail) reopen() error { } break } - OpenFileCount.Increment() + OpenFileCount.Add(1) return nil } diff --git a/plugins/inputs/logfile/tail/tail_test.go b/plugins/inputs/logfile/tail/tail_test.go index 79acf75a04..e63afe9d12 100644 --- a/plugins/inputs/logfile/tail/tail_test.go +++ b/plugins/inputs/logfile/tail/tail_test.go @@ -99,7 +99,7 @@ func TestStopAtEOF(t *testing.T) { assert.Equal(t, errStopAtEOF, tail.Err()) // Read to EOF - for i := 0; i < linesWrittenToFile - 3; i++ { + for i := 0; i < linesWrittenToFile-3; i++ { <-tail.Lines } @@ -107,7 +107,7 @@ func TestStopAtEOF(t *testing.T) { select { case <-done: t.Log("StopAtEOF() completed (as expected)") - case <- time.After(time.Second * 1): + case <-time.After(time.Second * 1): t.Fatalf("StopAtEOF() has not completed") } @@ -148,7 +148,7 @@ func setup(t *testing.T) (*os.File, *Tail, *testLogger) { if err != nil { t.Fatalf("failed to tail file %v: %v", tmpfile.Name(), err) } - // Cannot expect OpenFileCount.Get() to be 1 because the TailFile struct + // Cannot expect OpenFileCount to be 1 because the TailFile struct // was not created with MustExist=true, so file may not yet be opened. return tmpfile, tail, &tl } @@ -165,7 +165,7 @@ func readThreelines(t *testing.T, tail *Tail) { } } // If file was readable, then expect it to exist. - assert.Equal(t, int64(1), OpenFileCount.Get()) + assert.Equal(t, int64(1), OpenFileCount.Load()) } func verifyTailerLogging(t *testing.T, tlog *testLogger, expectedErrorMsg string) { @@ -182,7 +182,7 @@ func verifyTailerLogging(t *testing.T, tlog *testLogger, expectedErrorMsg string func verifyTailerExited(t *testing.T, tail *Tail) { select { case <-tail.Dead(): - assert.Equal(t, int64(0), OpenFileCount.Get()) + assert.Equal(t, int64(0), OpenFileCount.Load()) return default: t.Errorf("Tailer is still alive after file removed and wait period") diff --git a/plugins/inputs/logfile/tailersrc_test.go b/plugins/inputs/logfile/tailersrc_test.go index a1a73a3327..04438238c9 100644 --- a/plugins/inputs/logfile/tailersrc_test.go +++ b/plugins/inputs/logfile/tailersrc_test.go @@ -46,7 +46,7 @@ func TestTailerSrc(t *testing.T) { if err != nil { t.Errorf("Failed to create temp file: %v", err) } - beforeCount := tail.OpenFileCount.Get() + beforeCount := tail.OpenFileCount.Load() tailer, err := tail.TailFile(file.Name(), tail.Config{ ReOpen: false, @@ -63,7 +63,7 @@ func TestTailerSrc(t *testing.T) { t.Errorf("Failed to create tailer src for file %v with error: %v", file, err) return } - assert.Equal(t, beforeCount + 1, tail.OpenFileCount.Get()) + assert.Equal(t, beforeCount+1, tail.OpenFileCount.Load()) ts := NewTailerSrc( "groupName", "streamName", "destination", @@ -152,7 +152,7 @@ func TestTailerSrc(t *testing.T) { // Most test functions do not wait for the Tail to close the file. // They rely on Tail to detect file deletion and close the file. // So the count might be nonzero due to previous test cases. - assert.LessOrEqual(t, tail.OpenFileCount.Get(), beforeCount) + assert.LessOrEqual(t, tail.OpenFileCount.Load(), beforeCount) } func TestOffsetDoneCallBack(t *testing.T) {