Skip to content

Commit

Permalink
Port four Harvester tests of log input to filestream in Golang (#24250)
Browse files Browse the repository at this point in the history
  • Loading branch information
kvch authored Mar 11, 2021
1 parent b4ac0ee commit f323b36
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 4 deletions.
109 changes: 105 additions & 4 deletions filebeat/input/filestream/input_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ package filestream

import (
"context"
"os"
"runtime"
"testing"

loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile"
)

// test_close_renamed from test_harvester.py
Expand Down Expand Up @@ -57,7 +60,6 @@ func TestFilestreamCloseRenamed(t *testing.T) {
newerTestlines := []byte("new first log line\nnew second log line\n")
env.mustWriteLinesToFile(testlogName, newerTestlines)

// new two events arrived
env.waitUntilEventCount(3)

cancelInput()
Expand All @@ -67,6 +69,46 @@ func TestFilestreamCloseRenamed(t *testing.T) {
env.requireOffsetInRegistry(testlogName, len(newerTestlines))
}

// test_close_removed from test_harvester.py
func TestFilestreamCloseRemoved(t *testing.T) {
env := newInputTestingEnvironment(t)

testlogName := "test.log"
inp := env.mustCreateInput(map[string]interface{}{
"paths": []string{env.abspath(testlogName) + "*"},
"prospector.scanner.check_interval": "24h",
"close.on_state_change.check_interval": "1ms",
"close.on_state_change.removed": "true",
})

ctx, cancelInput := context.WithCancel(context.Background())
env.startInput(ctx, inp)

testlines := []byte("first log line\n")
env.mustWriteLinesToFile(testlogName, testlines)

// first event has made it successfully
env.waitUntilEventCount(1)

env.requireOffsetInRegistry(testlogName, len(testlines))

fi, err := os.Stat(env.abspath(testlogName))
if err != nil {
t.Fatalf("cannot stat file: %+v", err)
}

env.mustRemoveFile(testlogName)

env.waitUntilHarvesterIsDone()

cancelInput()
env.waitUntilInputStops()

identifier, _ := newINodeDeviceIdentifier(nil)
src := identifier.GetSource(loginp.FSEvent{Info: fi, Op: loginp.OpCreate, NewPath: env.abspath(testlogName)})
env.requireOffsetInRegistryByID(src.Name(), len(testlines))
}

// test_close_eof from test_harvester.py
func TestFilestreamCloseEOF(t *testing.T) {
env := newInputTestingEnvironment(t)
Expand All @@ -78,13 +120,13 @@ func TestFilestreamCloseEOF(t *testing.T) {
"close.reader.on_eof": "true",
})

ctx, cancelInput := context.WithCancel(context.Background())
env.startInput(ctx, inp)

testlines := []byte("first log line\n")
expectedOffset := len(testlines)
env.mustWriteLinesToFile(testlogName, testlines)

ctx, cancelInput := context.WithCancel(context.Background())
env.startInput(ctx, inp)

// first event has made it successfully
env.waitUntilEventCount(1)
env.requireOffsetInRegistry(testlogName, expectedOffset)
Expand All @@ -100,3 +142,62 @@ func TestFilestreamCloseEOF(t *testing.T) {

env.requireOffsetInRegistry(testlogName, expectedOffset)
}

// test_empty_lines from test_harvester.py
func TestFilestreamEmptyLine(t *testing.T) {
env := newInputTestingEnvironment(t)

testlogName := "test.log"
inp := env.mustCreateInput(map[string]interface{}{
"paths": []string{env.abspath(testlogName)},
"prospector.scanner.check_interval": "1ms",
})

ctx, cancelInput := context.WithCancel(context.Background())
env.startInput(ctx, inp)

testlines := []byte("first log line\nnext is an empty line\n")
env.mustWriteLinesToFile(testlogName, testlines)

env.waitUntilEventCount(2)
env.requireOffsetInRegistry(testlogName, len(testlines))

moreTestlines := []byte("\nafter an empty line\n")
env.mustAppendLinesToFile(testlogName, moreTestlines)

env.waitUntilEventCount(3)
env.requireEventsReceived([]string{
"first log line",
"next is an empty line",
"after an empty line",
})

cancelInput()
env.waitUntilInputStops()

env.requireOffsetInRegistry(testlogName, len(testlines)+len(moreTestlines))
}

// test_empty_lines_only from test_harvester.py
// This test differs from the original because in filestream
// input offset is no longer persisted when the line is empty.
func TestFilestreamEmptyLinesOnly(t *testing.T) {
env := newInputTestingEnvironment(t)

testlogName := "test.log"
inp := env.mustCreateInput(map[string]interface{}{
"paths": []string{env.abspath(testlogName)},
"prospector.scanner.check_interval": "1ms",
})

ctx, cancelInput := context.WithCancel(context.Background())
env.startInput(ctx, inp)

testlines := []byte("\n\n\n")
env.mustWriteLinesToFile(testlogName, testlines)

cancelInput()
env.waitUntilInputStops()

env.requireNoEntryInRegistry(testlogName)
}
35 changes: 35 additions & 0 deletions libbeat/reader/readfile/line_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/text/transform"

"github.com/elastic/beats/v7/libbeat/reader/readfile/encoding"
Expand Down Expand Up @@ -390,3 +391,37 @@ func TestMaxBytesLimit(t *testing.T) {
}
}
}

// test_exceed_buffer from test_harvester.py
func TestBufferSize(t *testing.T) {
lines := []string{
"first line is too long\n",
"second line is too long\n",
"third line too long\n",
"OK\n",
}

codecFactory, _ := encoding.FindEncoding("")
codec, _ := codecFactory(bytes.NewBuffer(nil))
bufferSize := 10

in := ioutil.NopCloser(strings.NewReader(strings.Join(lines, "")))
reader, err := NewLineReader(in, Config{codec, bufferSize, AutoLineTerminator, 1024})
if err != nil {
t.Fatal("failed to initialize reader:", err)
}

for i := 0; i < len(lines); i++ {
b, n, err := reader.Next()
if err != nil {
if err == io.EOF {
break
} else {
t.Fatal("unexpected error:", err)
}
}

require.Equal(t, n, len(lines[i]))
require.Equal(t, string(b[:n]), lines[i])
}
}

0 comments on commit f323b36

Please sign in to comment.