From f1edfc9c8923f872f2edf47962cc30b927cbbe73 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Wed, 14 Oct 2020 18:28:28 +0200 Subject: [PATCH 1/4] Add tests of reader of filestream input --- filebeat/input/filestream/filestream.go | 10 +- filebeat/input/filestream/filestream_test.go | 229 +++++++++++++++++++ 2 files changed, 232 insertions(+), 7 deletions(-) create mode 100644 filebeat/input/filestream/filestream_test.go diff --git a/filebeat/input/filestream/filestream.go b/filebeat/input/filestream/filestream.go index 4d42bbf62423..1a559c67e060 100644 --- a/filebeat/input/filestream/filestream.go +++ b/filebeat/input/filestream/filestream.go @@ -138,20 +138,16 @@ func (f *logFile) Read(buf []byte) (int, error) { } func (f *logFile) startFileMonitoringIfNeeded() { - if f.closeInactive == 0 && f.closeAfterInterval == 0 { - return - } - - if f.closeInactive > 0 { + if f.closeInactive > 0 || f.closeRemoved || f.closeRenamed { f.tg.Go(func(ctx unison.Canceler) error { - f.closeIfTimeout(ctx) + f.periodicStateCheck(ctx) return nil }) } if f.closeAfterInterval > 0 { f.tg.Go(func(ctx unison.Canceler) error { - f.periodicStateCheck(ctx) + f.closeIfTimeout(ctx) return nil }) } diff --git a/filebeat/input/filestream/filestream_test.go b/filebeat/input/filestream/filestream_test.go new file mode 100644 index 000000000000..d8f26b6a5232 --- /dev/null +++ b/filebeat/input/filestream/filestream_test.go @@ -0,0 +1,229 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package filestream + +import ( + "context" + "io" + "io/ioutil" + "os" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/v7/libbeat/logp" +) + +func TestLogFileTimedClosing(t *testing.T) { + testCases := map[string]struct { + inactive time.Duration + closeEOF bool + afterInterval time.Duration + expectedErr error + }{ + "read from file and close inactive": { + inactive: 2 * time.Second, + expectedErr: ErrClosed, + }, + "read from file and close after interval": { + afterInterval: 3 * time.Second, + expectedErr: ErrClosed, + }, + "read from file and close on EOF": { + closeEOF: true, + expectedErr: io.EOF, + }, + } + + for name, test := range testCases { + test := test + + f := createTestLogFile() + defer f.Close() + defer os.Remove(f.Name()) + + t.Run(name, func(t *testing.T) { + reader, err := newFileReader( + logp.L(), + context.TODO(), + f, + readerConfig{}, + closerConfig{ + OnStateChange: stateChangeCloserConfig{ + CheckInterval: 1 * time.Second, + Inactive: test.inactive, + }, + Reader: readerCloserConfig{ + OnEOF: test.closeEOF, + AfterInterval: test.afterInterval, + }, + }, + ) + if err != nil { + t.Fatalf("error while creating logReader: %+v", err) + } + + err = readUntilError(reader) + + assert.Equal(t, test.expectedErr, err) + }) + } +} + +func TestLogFileTruncated(t *testing.T) { + f := createTestLogFile() + defer f.Close() + defer os.Remove(f.Name()) + + reader, err := newFileReader( + logp.L(), + context.TODO(), + f, + readerConfig{}, + closerConfig{}, + ) + if err != nil { + t.Fatalf("error while creating logReader: %+v", err) + } + + buf := make([]byte, 1024) + _, err = reader.Read(buf) + assert.Nil(t, err) + + go func() { + err := f.Truncate(0) + if err != nil { + panic(err) + } + }() + + for err == nil { + buf := make([]byte, 1024) + _, err = reader.Read(buf) + } + + assert.Equal(t, ErrFileTruncate, err) + +} + +func TestLogFileRenamed(t *testing.T) { + f := createTestLogFile() + defer f.Close() + + renamedFile := f.Name() + ".renamed" + + reader, err := newFileReader( + logp.L(), + context.TODO(), + f, + readerConfig{}, + closerConfig{ + OnStateChange: stateChangeCloserConfig{ + CheckInterval: 1 * time.Second, + Renamed: true, + }, + }, + ) + if err != nil { + t.Fatalf("error while creating logReader: %+v", err) + } + + buf := make([]byte, 1024) + _, err = reader.Read(buf) + assert.Nil(t, err) + + go func() { + err := os.Rename(f.Name(), renamedFile) + if err != nil { + panic(err) + } + }() + + err = readUntilError(reader) + + assert.Equal(t, ErrClosed, err) + + os.Remove(renamedFile) +} + +func TestLogFileRemoved(t *testing.T) { + f := createTestLogFile() + defer f.Close() + + reader, err := newFileReader( + logp.L(), + context.TODO(), + f, + readerConfig{}, + closerConfig{ + OnStateChange: stateChangeCloserConfig{ + CheckInterval: 1 * time.Second, + Removed: true, + }, + }, + ) + if err != nil { + t.Fatalf("error while creating logReader: %+v", err) + } + + buf := make([]byte, 1024) + _, err = reader.Read(buf) + assert.Nil(t, err) + + go func() { + err := os.Remove(f.Name()) + if err != nil { + panic(err) + } + }() + + err = readUntilError(reader) + + assert.Equal(t, ErrClosed, err) +} + +func createTestLogFile() *os.File { + f := createEmptyTestFile() + content := []byte("first log line\nanother interesting line\na third log message\n") + if _, err := f.Write(content); err != nil { + panic(err) + } + if _, err := f.Seek(0, io.SeekStart); err != nil { + panic(err) + } + return f +} + +func createEmptyTestFile() *os.File { + f, err := ioutil.TempFile("", "filestream_reader_test") + if err != nil { + panic(err) + } + return f +} + +func readUntilError(reader *logFile) error { + buf := make([]byte, 1024) + _, err := reader.Read(buf) + for err == nil { + buf := make([]byte, 1024) + _, err = reader.Read(buf) + } + return err +} From b867d924e39c2d83791939ba4417d640e3afd572 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Thu, 15 Oct 2020 13:24:45 +0200 Subject: [PATCH 2/4] remove unnecessary goroutines --- filebeat/input/filestream/filestream_test.go | 60 +++++++------------- 1 file changed, 19 insertions(+), 41 deletions(-) diff --git a/filebeat/input/filestream/filestream_test.go b/filebeat/input/filestream/filestream_test.go index d8f26b6a5232..05de20455793 100644 --- a/filebeat/input/filestream/filestream_test.go +++ b/filebeat/input/filestream/filestream_test.go @@ -91,13 +91,7 @@ func TestLogFileTruncated(t *testing.T) { defer f.Close() defer os.Remove(f.Name()) - reader, err := newFileReader( - logp.L(), - context.TODO(), - f, - readerConfig{}, - closerConfig{}, - ) + reader, err := newFileReader(logp.L(), context.TODO(), f, readerConfig{}, closerConfig{}) if err != nil { t.Fatalf("error while creating logReader: %+v", err) } @@ -106,20 +100,14 @@ func TestLogFileTruncated(t *testing.T) { _, err = reader.Read(buf) assert.Nil(t, err) - go func() { - err := f.Truncate(0) - if err != nil { - panic(err) - } - }() - - for err == nil { - buf := make([]byte, 1024) - _, err = reader.Read(buf) + err = f.Truncate(0) + if err != nil { + t.Fatalf("error while truncating file: %+v", err) } - assert.Equal(t, ErrFileTruncate, err) + err = readUntilError(reader) + assert.Equal(t, ErrFileTruncate, err) } func TestLogFileRenamed(t *testing.T) { @@ -148,18 +136,15 @@ func TestLogFileRenamed(t *testing.T) { _, err = reader.Read(buf) assert.Nil(t, err) - go func() { - err := os.Rename(f.Name(), renamedFile) - if err != nil { - panic(err) - } - }() + err = os.Rename(f.Name(), renamedFile) + if err != nil { + t.Fatalf("error while renaming file: %+v", err) + } err = readUntilError(reader) + os.Remove(renamedFile) assert.Equal(t, ErrClosed, err) - - os.Remove(renamedFile) } func TestLogFileRemoved(t *testing.T) { @@ -186,12 +171,10 @@ func TestLogFileRemoved(t *testing.T) { _, err = reader.Read(buf) assert.Nil(t, err) - go func() { - err := os.Remove(f.Name()) - if err != nil { - panic(err) - } - }() + err = os.Remove(f.Name()) + if err != nil { + t.Fatalf("error while remove file: %+v", err) + } err = readUntilError(reader) @@ -199,7 +182,10 @@ func TestLogFileRemoved(t *testing.T) { } func createTestLogFile() *os.File { - f := createEmptyTestFile() + f, err := ioutil.TempFile("", "filestream_reader_test") + if err != nil { + panic(err) + } content := []byte("first log line\nanother interesting line\na third log message\n") if _, err := f.Write(content); err != nil { panic(err) @@ -210,14 +196,6 @@ func createTestLogFile() *os.File { return f } -func createEmptyTestFile() *os.File { - f, err := ioutil.TempFile("", "filestream_reader_test") - if err != nil { - panic(err) - } - return f -} - func readUntilError(reader *logFile) error { buf := make([]byte, 1024) _, err := reader.Read(buf) From ed9e65056663522fe9440e2ee32fbb94eadb9d3d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Fri, 16 Oct 2020 11:19:02 +0200 Subject: [PATCH 3/4] separate tests --- filebeat/input/filestream/filestream_test.go | 71 ------------ .../filestream/filestream_test_non_windows.go | 102 ++++++++++++++++++ 2 files changed, 102 insertions(+), 71 deletions(-) create mode 100644 filebeat/input/filestream/filestream_test_non_windows.go diff --git a/filebeat/input/filestream/filestream_test.go b/filebeat/input/filestream/filestream_test.go index 05de20455793..329fa0ad55fb 100644 --- a/filebeat/input/filestream/filestream_test.go +++ b/filebeat/input/filestream/filestream_test.go @@ -110,77 +110,6 @@ func TestLogFileTruncated(t *testing.T) { assert.Equal(t, ErrFileTruncate, err) } -func TestLogFileRenamed(t *testing.T) { - f := createTestLogFile() - defer f.Close() - - renamedFile := f.Name() + ".renamed" - - reader, err := newFileReader( - logp.L(), - context.TODO(), - f, - readerConfig{}, - closerConfig{ - OnStateChange: stateChangeCloserConfig{ - CheckInterval: 1 * time.Second, - Renamed: true, - }, - }, - ) - if err != nil { - t.Fatalf("error while creating logReader: %+v", err) - } - - buf := make([]byte, 1024) - _, err = reader.Read(buf) - assert.Nil(t, err) - - err = os.Rename(f.Name(), renamedFile) - if err != nil { - t.Fatalf("error while renaming file: %+v", err) - } - - err = readUntilError(reader) - os.Remove(renamedFile) - - assert.Equal(t, ErrClosed, err) -} - -func TestLogFileRemoved(t *testing.T) { - f := createTestLogFile() - defer f.Close() - - reader, err := newFileReader( - logp.L(), - context.TODO(), - f, - readerConfig{}, - closerConfig{ - OnStateChange: stateChangeCloserConfig{ - CheckInterval: 1 * time.Second, - Removed: true, - }, - }, - ) - if err != nil { - t.Fatalf("error while creating logReader: %+v", err) - } - - buf := make([]byte, 1024) - _, err = reader.Read(buf) - assert.Nil(t, err) - - err = os.Remove(f.Name()) - if err != nil { - t.Fatalf("error while remove file: %+v", err) - } - - err = readUntilError(reader) - - assert.Equal(t, ErrClosed, err) -} - func createTestLogFile() *os.File { f, err := ioutil.TempFile("", "filestream_reader_test") if err != nil { diff --git a/filebeat/input/filestream/filestream_test_non_windows.go b/filebeat/input/filestream/filestream_test_non_windows.go new file mode 100644 index 000000000000..183d4f56f7ed --- /dev/null +++ b/filebeat/input/filestream/filestream_test_non_windows.go @@ -0,0 +1,102 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// +build !windows + +package filestream + +import ( + "context" + "os" + "testing" + "time" + + "github.com/elastic/beats/v7/libbeat/logp" +) + +// these tests are separated as one cannot delete/rename files +// while another process is working with it on Windows +func TestLogFileRenamed(t *testing.T) { + f := createTestLogFile() + defer f.Close() + + renamedFile := f.Name() + ".renamed" + + reader, err := newFileReader( + logp.L(), + context.TODO(), + f, + readerConfig{}, + closerConfig{ + OnStateChange: stateChangeCloserConfig{ + CheckInterval: 1 * time.Second, + Renamed: true, + }, + }, + ) + if err != nil { + t.Fatalf("error while creating logReader: %+v", err) + } + + buf := make([]byte, 1024) + _, err = reader.Read(buf) + assert.Nil(t, err) + + err = os.Rename(f.Name(), renamedFile) + if err != nil { + t.Fatalf("error while renaming file: %+v", err) + } + + err = readUntilError(reader) + os.Remove(renamedFile) + + assert.Equal(t, ErrClosed, err) +} + +func TestLogFileRemoved(t *testing.T) { + f := createTestLogFile() + defer f.Close() + + reader, err := newFileReader( + logp.L(), + context.TODO(), + f, + readerConfig{}, + closerConfig{ + OnStateChange: stateChangeCloserConfig{ + CheckInterval: 1 * time.Second, + Removed: true, + }, + }, + ) + if err != nil { + t.Fatalf("error while creating logReader: %+v", err) + } + + buf := make([]byte, 1024) + _, err = reader.Read(buf) + assert.Nil(t, err) + + err = os.Remove(f.Name()) + if err != nil { + t.Fatalf("error while remove file: %+v", err) + } + + err = readUntilError(reader) + + assert.Equal(t, ErrClosed, err) +} From 0a67c3c58bd49c686bc2d1bb0277bb000e9c3bbc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Fri, 16 Oct 2020 11:29:26 +0200 Subject: [PATCH 4/4] add missing import --- filebeat/input/filestream/filestream_test_non_windows.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/filebeat/input/filestream/filestream_test_non_windows.go b/filebeat/input/filestream/filestream_test_non_windows.go index 183d4f56f7ed..9c2b33ed3de2 100644 --- a/filebeat/input/filestream/filestream_test_non_windows.go +++ b/filebeat/input/filestream/filestream_test_non_windows.go @@ -25,6 +25,8 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/elastic/beats/v7/libbeat/logp" )