Skip to content

Commit

Permalink
[Filebeat] Fix offset field pointing at end of line for #6514
Browse files Browse the repository at this point in the history
  • Loading branch information
original-brownbear authored and ph committed Jun 15, 2018
1 parent 2d458f2 commit df724c9
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ https://github.com/elastic/beats/compare/v6.2.3...master[Check the HEAD diff]
- Fix an issue with an overflowing wait group when using the TCP input. {issue}7202[7202]
- Keep different registry entry per container stream to avoid wrong offsets. {issue}7281[7281]
- Optimize PostgreSQL ingest pipeline to use anchored regexp and merge multiple regexp into a single expression. {pull}7269[7269]
- Fix offset field pointing at end of a line. {issue}6514[6514]

*Heartbeat*
- Fix race due to updates of shared a map, that was not supposed to be shared between multiple go-routines. {issue}6616[6616]
Expand Down
3 changes: 2 additions & 1 deletion filebeat/input/log/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ func (h *Harvester) Run() error {
// This is important in case sending is not successful so on shutdown
// the old offset is reported
state := h.getState()
startingOffset := state.Offset
state.Offset += int64(message.Bytes)

// Create state event
Expand All @@ -281,7 +282,7 @@ func (h *Harvester) Run() error {
if !message.IsEmpty() && h.shouldExportLine(text) {
fields := common.MapStr{
"source": state.Source,
"offset": state.Offset, // Offset here is the offset before the starting char.
"offset": startingOffset, // Offset here is the offset before the starting char.
}
fields.DeepUpdate(message.Fields)

Expand Down
9 changes: 7 additions & 2 deletions filebeat/tests/system/test_shutdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,13 @@ def test_shutdown_wait_ok(self):

# we allow for a potential race in the harvester shutdown here.
# In some cases the registry offset might match the penultimate offset.
assert (offset == outputs[-1]["offset"] or
offset == outputs[-2]["offset"])

eol_offset = 1
if os.name == "nt":
eol_offset += 1

assert (offset == (outputs[-1]["offset"] + eol_offset + len(outputs[-1]["message"])) or
offset == (outputs[-2]["offset"] + eol_offset + len(outputs[-2]["message"])))

def test_shutdown_wait_timeout(self):
"""
Expand Down

0 comments on commit df724c9

Please sign in to comment.