From f323b363dcd73322907588b26bd8ef359183f32a Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= <kvch@users.noreply.github.com>
Date: Thu, 11 Mar 2021 10:14:22 +0100
Subject: [PATCH] Port four Harvester tests of log input to filestream in
 Golang (#24250)

---
 .../filestream/input_integration_test.go      | 109 +++++++++++++++++-
 libbeat/reader/readfile/line_test.go          |  35 ++++++
 2 files changed, 140 insertions(+), 4 deletions(-)

diff --git a/filebeat/input/filestream/input_integration_test.go b/filebeat/input/filestream/input_integration_test.go
index e3c08b434ac..fecceb38789 100644
--- a/filebeat/input/filestream/input_integration_test.go
+++ b/filebeat/input/filestream/input_integration_test.go
@@ -21,8 +21,11 @@ package filestream
 
 import (
 	"context"
+	"os"
 	"runtime"
 	"testing"
+
+	loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile"
 )
 
 // test_close_renamed from test_harvester.py
@@ -57,7 +60,6 @@ func TestFilestreamCloseRenamed(t *testing.T) {
 	newerTestlines := []byte("new first log line\nnew second log line\n")
 	env.mustWriteLinesToFile(testlogName, newerTestlines)
 
-	// new two events arrived
 	env.waitUntilEventCount(3)
 
 	cancelInput()
@@ -67,6 +69,46 @@ func TestFilestreamCloseRenamed(t *testing.T) {
 	env.requireOffsetInRegistry(testlogName, len(newerTestlines))
 }
 
+// test_close_removed from test_harvester.py
+func TestFilestreamCloseRemoved(t *testing.T) {
+	env := newInputTestingEnvironment(t)
+
+	testlogName := "test.log"
+	inp := env.mustCreateInput(map[string]interface{}{
+		"paths":                                []string{env.abspath(testlogName) + "*"},
+		"prospector.scanner.check_interval":    "24h",
+		"close.on_state_change.check_interval": "1ms",
+		"close.on_state_change.removed":        "true",
+	})
+
+	ctx, cancelInput := context.WithCancel(context.Background())
+	env.startInput(ctx, inp)
+
+	testlines := []byte("first log line\n")
+	env.mustWriteLinesToFile(testlogName, testlines)
+
+	// first event has made it successfully
+	env.waitUntilEventCount(1)
+
+	env.requireOffsetInRegistry(testlogName, len(testlines))
+
+	fi, err := os.Stat(env.abspath(testlogName))
+	if err != nil {
+		t.Fatalf("cannot stat file: %+v", err)
+	}
+
+	env.mustRemoveFile(testlogName)
+
+	env.waitUntilHarvesterIsDone()
+
+	cancelInput()
+	env.waitUntilInputStops()
+
+	identifier, _ := newINodeDeviceIdentifier(nil)
+	src := identifier.GetSource(loginp.FSEvent{Info: fi, Op: loginp.OpCreate, NewPath: env.abspath(testlogName)})
+	env.requireOffsetInRegistryByID(src.Name(), len(testlines))
+}
+
 // test_close_eof from test_harvester.py
 func TestFilestreamCloseEOF(t *testing.T) {
 	env := newInputTestingEnvironment(t)
@@ -78,13 +120,13 @@ func TestFilestreamCloseEOF(t *testing.T) {
 		"close.reader.on_eof":               "true",
 	})
 
-	ctx, cancelInput := context.WithCancel(context.Background())
-	env.startInput(ctx, inp)
-
 	testlines := []byte("first log line\n")
 	expectedOffset := len(testlines)
 	env.mustWriteLinesToFile(testlogName, testlines)
 
+	ctx, cancelInput := context.WithCancel(context.Background())
+	env.startInput(ctx, inp)
+
 	// first event has made it successfully
 	env.waitUntilEventCount(1)
 	env.requireOffsetInRegistry(testlogName, expectedOffset)
@@ -100,3 +142,62 @@ func TestFilestreamCloseEOF(t *testing.T) {
 
 	env.requireOffsetInRegistry(testlogName, expectedOffset)
 }
+
+// test_empty_lines from test_harvester.py
+func TestFilestreamEmptyLine(t *testing.T) {
+	env := newInputTestingEnvironment(t)
+
+	testlogName := "test.log"
+	inp := env.mustCreateInput(map[string]interface{}{
+		"paths":                             []string{env.abspath(testlogName)},
+		"prospector.scanner.check_interval": "1ms",
+	})
+
+	ctx, cancelInput := context.WithCancel(context.Background())
+	env.startInput(ctx, inp)
+
+	testlines := []byte("first log line\nnext is an empty line\n")
+	env.mustWriteLinesToFile(testlogName, testlines)
+
+	env.waitUntilEventCount(2)
+	env.requireOffsetInRegistry(testlogName, len(testlines))
+
+	moreTestlines := []byte("\nafter an empty line\n")
+	env.mustAppendLinesToFile(testlogName, moreTestlines)
+
+	env.waitUntilEventCount(3)
+	env.requireEventsReceived([]string{
+		"first log line",
+		"next is an empty line",
+		"after an empty line",
+	})
+
+	cancelInput()
+	env.waitUntilInputStops()
+
+	env.requireOffsetInRegistry(testlogName, len(testlines)+len(moreTestlines))
+}
+
+// test_empty_lines_only from test_harvester.py
+// This test differs from the original because in filestream
+// input offset is no longer persisted when the line is empty.
+func TestFilestreamEmptyLinesOnly(t *testing.T) {
+	env := newInputTestingEnvironment(t)
+
+	testlogName := "test.log"
+	inp := env.mustCreateInput(map[string]interface{}{
+		"paths":                             []string{env.abspath(testlogName)},
+		"prospector.scanner.check_interval": "1ms",
+	})
+
+	ctx, cancelInput := context.WithCancel(context.Background())
+	env.startInput(ctx, inp)
+
+	testlines := []byte("\n\n\n")
+	env.mustWriteLinesToFile(testlogName, testlines)
+
+	cancelInput()
+	env.waitUntilInputStops()
+
+	env.requireNoEntryInRegistry(testlogName)
+}
diff --git a/libbeat/reader/readfile/line_test.go b/libbeat/reader/readfile/line_test.go
index d91544162c5..df28f3345b9 100644
--- a/libbeat/reader/readfile/line_test.go
+++ b/libbeat/reader/readfile/line_test.go
@@ -30,6 +30,7 @@ import (
 	"time"
 
 	"github.com/stretchr/testify/assert"
+	"github.com/stretchr/testify/require"
 	"golang.org/x/text/transform"
 
 	"github.com/elastic/beats/v7/libbeat/reader/readfile/encoding"
@@ -390,3 +391,37 @@ func TestMaxBytesLimit(t *testing.T) {
 		}
 	}
 }
+
+// test_exceed_buffer from test_harvester.py
+func TestBufferSize(t *testing.T) {
+	lines := []string{
+		"first line is too long\n",
+		"second line is too long\n",
+		"third line too long\n",
+		"OK\n",
+	}
+
+	codecFactory, _ := encoding.FindEncoding("")
+	codec, _ := codecFactory(bytes.NewBuffer(nil))
+	bufferSize := 10
+
+	in := ioutil.NopCloser(strings.NewReader(strings.Join(lines, "")))
+	reader, err := NewLineReader(in, Config{codec, bufferSize, AutoLineTerminator, 1024})
+	if err != nil {
+		t.Fatal("failed to initialize reader:", err)
+	}
+
+	for i := 0; i < len(lines); i++ {
+		b, n, err := reader.Next()
+		if err != nil {
+			if err == io.EOF {
+				break
+			} else {
+				t.Fatal("unexpected error:", err)
+			}
+		}
+
+		require.Equal(t, n, len(lines[i]))
+		require.Equal(t, string(b[:n]), lines[i])
+	}
+}