diff --git a/filebeat/input/filestream/input_integration_test.go b/filebeat/input/filestream/input_integration_test.go index 7ac82fe2608..9ed1fe24150 100644 --- a/filebeat/input/filestream/input_integration_test.go +++ b/filebeat/input/filestream/input_integration_test.go @@ -23,6 +23,7 @@ package filestream import ( "bytes" "context" + "fmt" "os" "runtime" "strconv" @@ -945,6 +946,52 @@ func TestGlobalIDCannotBeUsed(t *testing.T) { } } +// test_rotating_close_inactive_larger_write_rate from test_input.py +func TestRotatingCloseInactiveLargerWriteRate(t *testing.T) { + env := newInputTestingEnvironment(t) + + testlogName := "test.log" + inp := env.mustCreateInput(map[string]interface{}{ + "id": "my-id", + "paths": []string{ + env.abspath("*"), + }, + "prospector.scanner.check_interval": "100ms", + "close.on_state_change.check_interval": "1s", + "close.on_state_change.inactive": "5s", + "ignore_older": "10s", + }) + + ctx, cancelInput := context.WithCancel(context.Background()) + env.startInput(ctx, inp) + + time.Sleep(1 * time.Second) + + rotations := 2 + iterations := 3 + r := 0 + for r <= rotations { + f, err := os.Create(env.abspath(testlogName)) + if err != nil { + t.Fatalf("failed to open log file: %+v", err) + } + n := 0 + for n <= iterations { + f.Write([]byte(fmt.Sprintf("hello world %d\n", r*iterations+n))) + n += 1 + time.Sleep(100 * time.Millisecond) + } + env.mustRenameFile(testlogName, testlogName+time.Now().Format("2006-01-02T15:04:05.99999999")) + r += 1 + } + + // allow for events to be send multiple times due to log rotation + env.waitUntilAtLeastEventCount(rotations * iterations) + + cancelInput() + env.waitUntilInputStops() +} + // test_rotating_close_inactive_low_write_rate from test_input.py func TestRotatingCloseInactiveLowWriteRate(t *testing.T) { env := newInputTestingEnvironment(t)