diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 945cde1337bc..10f24ddf1ac0 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -174,6 +174,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix event.kind for system/syslog pipeline {issue}20365[20365] {pull}20390[20390] - Fix event.type for zeek/ssl and duplicate event.category for zeek/connection {pull}20696[20696] - Add json body check for sqs message. {pull}21727[21727] +- Properly update offset in case of unparasable line. {pull}22685[22685] - Drop aws.vpcflow.pkt_srcaddr and aws.vpcflow.pkt_dstaddr when equal to "-". {pull}22721[22721] {issue}22716[22716] - Fix cisco umbrella module config by adding input variable. {pull}22892[22892] - Fix network.direction logic in zeek connection fileset. {pull}22967[22967] diff --git a/filebeat/input/filestream/input.go b/filebeat/input/filestream/input.go index 8f21cb505cee..e436e602204e 100644 --- a/filebeat/input/filestream/input.go +++ b/filebeat/input/filestream/input.go @@ -309,9 +309,6 @@ func (inp *filestream) readFromSource( s.Offset = 0 case ErrClosed: log.Info("Reader was closed. Closing.") - case reader.ErrLineUnparsable: - log.Info("Skipping unparsable line in file.") - continue default: log.Errorf("Read line error: %v", err) } diff --git a/filebeat/input/log/harvester.go b/filebeat/input/log/harvester.go index 6b16861f8ece..0d4e6d6b539b 100644 --- a/filebeat/input/log/harvester.go +++ b/filebeat/input/log/harvester.go @@ -331,10 +331,6 @@ func (h *Harvester) Run() error { logp.Info("End of file reached: %s. Closing because close_eof is enabled.", h.state.Source) case ErrInactive: logp.Info("File is inactive: %s. Closing because close_inactive of %v reached.", h.state.Source, h.config.CloseInactive) - case reader.ErrLineUnparsable: - logp.Info("Skipping unparsable line in file: %v", h.state.Source) - //line unparsable, go to next line - continue default: logp.Err("Read line error: %v; File: %v", err, h.state.Source) } diff --git a/filebeat/tests/files/logs/docker_corrupted.log b/filebeat/tests/files/logs/docker_corrupted.log new file mode 100644 index 000000000000..b241a2691b98 --- /dev/null +++ b/filebeat/tests/files/logs/docker_corrupted.log @@ -0,0 +1,21 @@ +{"log":"Fetching main repository github.com/elastic/beats...\n","stream":"stdout","time":"2016-03-02T22:58:51.338462311Z"} +{"log":"Fetching dependencies...\n","stream":"stdout","time":"2016-03-02T22:59:04.609292428Z"} +{"log":"Execute /scripts/packetbeat_before_build.sh\n","stream":"stdout","time":"2016-03-02T22:59:04.617434682Z"} +{"log":"patching file vendor/github.com/tsg/gopacket/pcap/pcap.go\n","stream":"stdout","time":"2016-03-02T22:59:04.626534779Z"} +{"log":"cp etc/packetbeat.template.json /build/packetbeat.template.json\n","stream":"stdout","time":"2016-03-02T22:59:04.639782988Z"} +{"log":"# linux\n","stream":"stdout","time":"2016-03-02T22:59:04.646276053Z"} +"log":"cp packetbeat.yml /build/packetbeat-linux.yml\n","stream":"stdout","time":"2016-03-02T22:59:04.647847045Z"} +{"log":"# binary\n","stream":"stdout","time":"2016-03-02T22:59:04.653740138Z"} +{"log":"cp packetbeat.yml /build/packetbeat-binary.yml\n","stream":"stdout","time":"2016-03-02T22:59:04.655979016Z"} +{"log":"# darwin\n","stream":"stdout","time":"2016-03-02T22:59:04.661181197Z"} +{"log":"cp packetbeat.yml /build/packetbeat-darwin.yml\n","stream":"stdout","time":"2016-03-02T22:59:04.662859769Z"} +{"log":"sed -i.bk 's/device: any/device: en0/' /build/packetbeat-darwin.yml\n","stream":"stdout","time":"2016-03-02T22:59:04.66649744Z"} +{"log":"rm /build/packetbeat-darwin.yml.bk\n","stream":"stdout","time":"2016-03-02T22:59:04.701199002Z"} +{"log":"# win\n","stream":"stdout","time":"2016-03-02T22:59:04.705067809Z"} +{"log":"cp packetbeat.yml /build/packetbeat-win.yml\n","stream":"stdout","time":"2016-03-02T22:59:04.706629907Z"} +{"log":"sed -i.bk 's/device: any/device: 0/' /build/packetbeat-win.yml\n","stream":"stdout","time":"2016-03-02T22:59:04.711993313Z"} +{"log":"rm /build/packetbeat-win.yml.bk\n","stream":"stdout","time":"2016-03-02T22:59:04.757913979Z"} +{"log":"Compiling for windows/amd64...\n","stream":"stdout","time":"2016-03-02T22:59:04.761895467Z"} +{"log":"Compiling for windows/386...\n","stream":"stdout","time":"2016-03-02T22:59:29.481736885Z"} +{"log":"Compiling for darwin/amd64...\n","stream":"stdout","time":"2016-03-02T22:59:55.205334574Z"} +{"log":"Moving binaries to host...\n","stream":"stdout","time":"2016-03-02T23:00:15.140397826Z"} diff --git a/filebeat/tests/system/test_container.py b/filebeat/tests/system/test_container.py index ee0df7eb8e96..067eabd19772 100644 --- a/filebeat/tests/system/test_container.py +++ b/filebeat/tests/system/test_container.py @@ -66,3 +66,42 @@ def test_container_input_cri(self): output = self.read_output() assert len(output) == 1 assert output[0]["stream"] == "stdout" + + def test_container_input_registry_for_unparsable_lines(self): + """ + Test container input properly updates registry offset in case + of unparsable lines + """ + input_raw = """ +- type: container + paths: + - {}/logs/*.log +""" + self.render_config_template( + input_raw=input_raw.format(os.path.abspath(self.working_dir)), + inputs=False, + ) + + os.mkdir(self.working_dir + "/logs/") + self.copy_files(["logs/docker_corrupted.log"], + target_dir="logs") + + filebeat = self.start_beat() + + self.wait_until(lambda: self.output_has(lines=20)) + + filebeat.check_kill_and_wait() + + output = self.read_output() + assert len(output) == 20 + assert output[19]["message"] == "Moving binaries to host..." + for o in output: + assert o["stream"] == "stdout" + + # Check that file exist + data = self.get_registry() + logs = self.log_access() + assert logs.contains("Parse line error") == True + # bytes of healthy file are 2244 so for the corrupted one should + # be 2244-1=2243 since we removed one character + assert data[0]["offset"] == 2243 diff --git a/libbeat/reader/reader.go b/libbeat/reader/reader.go index 81ae4ad82412..43c389ac7c6e 100644 --- a/libbeat/reader/reader.go +++ b/libbeat/reader/reader.go @@ -18,7 +18,6 @@ package reader import ( - "errors" "io" ) @@ -30,8 +29,3 @@ type Reader interface { io.Closer Next() (Message, error) } - -var ( - //ErrLineUnparsable is error thrown when Next() element from input is corrupted and can not be parsed - ErrLineUnparsable = errors.New("line is unparsable") -) diff --git a/libbeat/reader/readjson/docker_json.go b/libbeat/reader/readjson/docker_json.go index 59dded97ec3a..d57c61c6a268 100644 --- a/libbeat/reader/readjson/docker_json.go +++ b/libbeat/reader/readjson/docker_json.go @@ -202,7 +202,7 @@ func (p *DockerJSONReader) Next() (reader.Message, error) { err = p.parseLine(&message, &logLine) if err != nil { p.logger.Errorf("Parse line error: %v", err) - return message, reader.ErrLineUnparsable + continue } // Handle multiline messages, join partial lines @@ -219,7 +219,7 @@ func (p *DockerJSONReader) Next() (reader.Message, error) { err = p.parseLine(&next, &logLine) if err != nil { p.logger.Errorf("Parse line error: %v", err) - return message, reader.ErrLineUnparsable + continue } message.Content = append(message.Content, next.Content...) } diff --git a/libbeat/reader/readjson/docker_json_test.go b/libbeat/reader/readjson/docker_json_test.go index 2c9e2e711047..de03b87da81e 100644 --- a/libbeat/reader/readjson/docker_json_test.go +++ b/libbeat/reader/readjson/docker_json_test.go @@ -18,6 +18,7 @@ package readjson import ( + "io" "testing" "time" @@ -53,7 +54,7 @@ func TestDockerJSON(t *testing.T) { name: "Wrong JSON", input: [][]byte{[]byte(`this is not JSON`)}, stream: "all", - expectedError: reader.ErrLineUnparsable, + expectedError: io.EOF, expectedMessage: reader.Message{ Bytes: 16, }, @@ -73,7 +74,7 @@ func TestDockerJSON(t *testing.T) { name: "Wrong CRI", input: [][]byte{[]byte(`2017-09-12T22:32:21.212861448Z stdout`)}, stream: "all", - expectedError: reader.ErrLineUnparsable, + expectedError: io.EOF, expectedMessage: reader.Message{ Bytes: 37, }, @@ -82,7 +83,7 @@ func TestDockerJSON(t *testing.T) { name: "Wrong CRI", input: [][]byte{[]byte(`{this is not JSON nor CRI`)}, stream: "all", - expectedError: reader.ErrLineUnparsable, + expectedError: io.EOF, expectedMessage: reader.Message{ Bytes: 25, }, @@ -91,7 +92,7 @@ func TestDockerJSON(t *testing.T) { name: "Missing time", input: [][]byte{[]byte(`{"log":"1:M 09 Nov 13:27:36.276 # User requested shutdown...\n","stream":"stdout"}`)}, stream: "all", - expectedError: reader.ErrLineUnparsable, + expectedError: io.EOF, expectedMessage: reader.Message{ Bytes: 82, }, @@ -218,7 +219,7 @@ func TestDockerJSON(t *testing.T) { input: [][]byte{[]byte(`{"log":"1:M 09 Nov 13:27:36.276 # User requested shutdown...\n","stream":"stdout"}`)}, stream: "all", format: "cri", - expectedError: reader.ErrLineUnparsable, + expectedError: io.EOF, expectedMessage: reader.Message{ Bytes: 82, }, @@ -228,7 +229,7 @@ func TestDockerJSON(t *testing.T) { input: [][]byte{[]byte(`2017-09-12T22:32:21.212861448Z stdout 2017-09-12 22:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache`)}, stream: "all", format: "docker", - expectedError: reader.ErrLineUnparsable, + expectedError: io.EOF, expectedMessage: reader.Message{ Bytes: 115, }, @@ -300,7 +301,7 @@ func TestDockerJSON(t *testing.T) { []byte(`{"log":"shutdown...\n","stream`), }, stream: "stdout", - expectedError: reader.ErrLineUnparsable, + expectedError: io.EOF, expectedMessage: reader.Message{ Bytes: 139, }, @@ -324,11 +325,25 @@ func TestDockerJSON(t *testing.T) { name: "Corrupted log message line", input: [][]byte{[]byte(`36.276 # User requested shutdown...\n","stream":"stdout","time":"2017-11-09T13:27:36.277747246Z"}`)}, stream: "all", - expectedError: reader.ErrLineUnparsable, + expectedError: io.EOF, expectedMessage: reader.Message{ Bytes: 97, }, }, + { + name: "Corrupted log message line is skipped, keep correct bytes count", + input: [][]byte{ + []byte(`36.276 # User requested shutdown...\n","stream":"stdout","time":"2017-11-09T13:27:36.277747246Z"}`), + []byte(`{"log":"1:M 09 Nov 13:27:36.276 # User requested","stream":"stdout","time":"2017-11-09T13:27:36.277747246Z"}`), + }, + stream: "all", + expectedMessage: reader.Message{ + Content: []byte("1:M 09 Nov 13:27:36.276 # User requested"), + Fields: common.MapStr{"stream": "stdout"}, + Ts: time.Date(2017, 11, 9, 13, 27, 36, 277747246, time.UTC), + Bytes: 205, + }, + }, } for _, test := range tests { @@ -358,6 +373,12 @@ type mockReader struct { } func (m *mockReader) Next() (reader.Message, error) { + if len(m.messages) < 1 { + return reader.Message{ + Content: []byte{}, + Bytes: 0, + }, io.EOF + } message := m.messages[0] m.messages = m.messages[1:] return reader.Message{