diff --git a/libbeat/publisher/queue/diskqueue/segments.go b/libbeat/publisher/queue/diskqueue/segments.go index eac1b8df0d6..eaf726d2c8c 100644 --- a/libbeat/publisher/queue/diskqueue/segments.go +++ b/libbeat/publisher/queue/diskqueue/segments.go @@ -371,14 +371,26 @@ func readSegmentHeaderWithFrameCount(path string) (*segmentHeader, error) { // All other errors mean we are done scanning, exit the loop. break } - // Try to advance to the next frame. - _, err = file.Seek(int64(frameLength), os.SEEK_CUR) + // Length is encoded in both the first and last four bytes of a frame. To + // detect truncated / corrupted frames, seek to the last four bytes of + // the current frame to make sure the trailing length matches before + // advancing to the next frame (otherwise we might accept an impossible + // length). + _, err = file.Seek(int64(frameLength-8), os.SEEK_CUR) if err != nil { - // An error in seeking probably means an invalid length, which - // indicates a truncated frame or data corruption, so end the - // loop without including it in our count. break } + var duplicateLength uint32 + err = binary.Read(reader, binary.LittleEndian, &duplicateLength) + if err != nil { + break + } + if frameLength != duplicateLength { + err = fmt.Errorf( + "mismatched frame length: %v vs %v", frameLength, duplicateLength) + break + } + header.frameCount++ } // If we ended up here instead of returning directly, then