From 2012b5e21dc7bae04345cef82a68eabcd8d9b2e3 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Mon, 7 Dec 2020 15:15:45 -0500 Subject: [PATCH] track frame index within segments --- libbeat/publisher/queue/diskqueue/acks.go | 18 ++++--- libbeat/publisher/queue/diskqueue/config.go | 3 +- .../queue/diskqueue/core_loop_test.go | 7 ++- libbeat/publisher/queue/diskqueue/queue.go | 22 +++++++++ libbeat/publisher/queue/diskqueue/segments.go | 49 ++++--------------- .../publisher/queue/diskqueue/state_file.go | 19 ++++++- 6 files changed, 65 insertions(+), 53 deletions(-) diff --git a/libbeat/publisher/queue/diskqueue/acks.go b/libbeat/publisher/queue/diskqueue/acks.go index ed9d7589db2d..60baf347d852 100644 --- a/libbeat/publisher/queue/diskqueue/acks.go +++ b/libbeat/publisher/queue/diskqueue/acks.go @@ -25,9 +25,14 @@ import ( ) // queuePosition represents a logical position within the queue buffer. +// The frame index is logically redundant with the segment offset, but +// calculating it requires a linear scan of the file so we store both values. +// Note that frameIndex is relative to the beginning of the segment, +// and has no relation to the sequence used by frameID. type queuePosition struct { - segmentID segmentID - offset segmentOffset + segmentID segmentID + offset segmentOffset + frameIndex uint64 } type diskQueueACKs struct { @@ -114,15 +119,16 @@ func (dqa *diskQueueACKs) addFrames(frames []*readFrame) { newSegment, ok := dqa.segmentBoundaries[dqa.nextFrameID] if ok { // This is the start of a new segment. Remove this frame from the - // segment boundary list and set the position to the start of the - // new segment. + // segment boundary list and reset the next position. delete(dqa.segmentBoundaries, dqa.nextFrameID) dqa.nextPosition = queuePosition{ - segmentID: newSegment, - offset: 0, + segmentID: newSegment, + offset: 0, + frameIndex: 0, } } dqa.nextPosition.offset += segmentOffset(dqa.frameSize[dqa.nextFrameID]) + dqa.nextPosition.frameIndex++ delete(dqa.frameSize, dqa.nextFrameID) } // We advanced the ACK position at least somewhat, so write its diff --git a/libbeat/publisher/queue/diskqueue/config.go b/libbeat/publisher/queue/diskqueue/config.go index e4649933aff9..b8ef456d03d4 100644 --- a/libbeat/publisher/queue/diskqueue/config.go +++ b/libbeat/publisher/queue/diskqueue/config.go @@ -190,8 +190,7 @@ func (settings Settings) segmentPath(segmentID segmentID) string { } func (settings Settings) maxSegmentOffset() segmentOffset { - return segmentOffset( - int(settings.MaxSegmentSize) - segmentHeaderSize) + return segmentOffset(settings.MaxSegmentSize - segmentHeaderSize) } // Given a retry interval, nextRetryInterval returns the next higher level diff --git a/libbeat/publisher/queue/diskqueue/core_loop_test.go b/libbeat/publisher/queue/diskqueue/core_loop_test.go index 829717a47683..33844057fbc9 100644 --- a/libbeat/publisher/queue/diskqueue/core_loop_test.go +++ b/libbeat/publisher/queue/diskqueue/core_loop_test.go @@ -122,7 +122,7 @@ func TestHandleProducerWriteRequest(t *testing.T) { } settings := DefaultSettings() - settings.MaxSegmentSize = uint64(1000 + segmentHeaderSize) + settings.MaxSegmentSize = 1000 + segmentHeaderSize settings.MaxBufferSize = 10000 for description, test := range testCases { dq := &diskQueue{ @@ -973,12 +973,11 @@ func makeWriteFrameWithSize(size int) *writeFrame { } func segmentWithSize(size int) *queueSegment { - headerSize := segmentHeaderSize - if size < headerSize { + if size < segmentHeaderSize { // Can't have a segment smaller than the segment header return nil } - return &queueSegment{endOffset: segmentOffset(size - headerSize)} + return &queueSegment{endOffset: segmentOffset(size - segmentHeaderSize)} } func equalReaderLoopRequests( diff --git a/libbeat/publisher/queue/diskqueue/queue.go b/libbeat/publisher/queue/diskqueue/queue.go index 664fb952bf6d..c5c0af151c78 100644 --- a/libbeat/publisher/queue/diskqueue/queue.go +++ b/libbeat/publisher/queue/diskqueue/queue.go @@ -137,6 +137,14 @@ func NewQueue(logger *logp.Logger, settings Settings) (queue.Queue, error) { // warning and fall back on the oldest existing segment, if any. logger.Warnf("Couldn't load most recent queue position: %v", err) } + if nextReadPosition.frameIndex == 0 { + // If the previous state was written by an older version, it may lack + // the frameIndex field. In this case we reset the read offset within + // the segment, which may cause one-time retransmission of some events + // from the previous session, but ensures that + // our metrics are consistent. + nextReadPosition.offset = 0 + } positionFile, err := os.OpenFile( settings.stateFilePath(), os.O_WRONLY|os.O_CREATE, 0600) if err != nil { @@ -178,6 +186,20 @@ func NewQueue(logger *logp.Logger, settings Settings) (queue.Queue, error) { nextReadPosition = queuePosition{segmentID: initialSegments[0].id} } + // We can compute the active frames right now but still need a way to report + // them to the global beat metrics. For now, just log the total. + // Note that for consistency with existing queue behavior, this excludes + // events that are still present on disk but were already sent and + // acknowledged on a previous run (we probably want to track these as well + // in the future.) + // TODO: pass in a context that queues can use to report these events. + activeFrameCount := 0 + for _, segment := range initialSegments { + activeFrameCount += int(segment.frameCount()) + } + activeFrameCount -= int(nextReadPosition.frameIndex) + logger.Infof("Found %d existing events on queue start", activeFrameCount) + queue := &diskQueue{ logger: logger, settings: settings, diff --git a/libbeat/publisher/queue/diskqueue/segments.go b/libbeat/publisher/queue/diskqueue/segments.go index f45bbace7347..eac1b8df0d6e 100644 --- a/libbeat/publisher/queue/diskqueue/segments.go +++ b/libbeat/publisher/queue/diskqueue/segments.go @@ -174,12 +174,6 @@ func scanExistingSegments(logger *logp.Logger, pathStr string) ([]*queueSegment, segments := []*queueSegment{} for _, file := range files { - /*if file.Size() <= segmentHeaderSize { - // Ignore segments that don't have at least some data beyond the - // header (this will always be true of segments we write unless there - // is an error). - continue - }*/ components := strings.Split(file.Name(), ".") if len(components) == 2 && strings.ToLower(components[1]) == "seg" { // Parse the id as base-10 64-bit unsigned int. We ignore file names that @@ -204,39 +198,6 @@ func scanExistingSegments(logger *logp.Logger, pathStr string) ([]*queueSegment, header: header, endOffset: segmentOffset(file.Size() - int64(header.sizeOnDisk())), }) - - //newSegment, err := prescanSegment(logger, segmentID(id), fullPath) - - // If a segment is returned with an error, this means we were able to - // read at least some data but the end of the file may be incomplete - // or corrupted. In this case we add it to our list and read as much of - // it as we can. - /*if newSegment == nil { - logger.Errorf("couldn't load segment file '%v': %v", fullPath, err) - } else { - if err != nil { - logger.Warnf( - "error loading segment file '%v', data may be incomplete: %v", - fullPath, err) - } - segments = append(segments, newSegment) - }*/ - /*frameCount, err := readFrameCount(path.Join(pathStr, file.Name())) - if frameCount == 0 { - logger.Errorf("") - } else { - if err != nil { - // If there is an error but frameCount is still positive, it means - // we - logger.Warnf( - "Error") - } - segments = append(segments, &queueSegment{ - id: segmentID(id), - endOffset: segmentOffset(file.Size() - segmentHeaderSize), - framesWritten: frameCount, - }) - }*/ } } } @@ -254,6 +215,16 @@ func (segment *queueSegment) sizeOnDisk() uint64 { return uint64(segment.endOffset) + uint64(headerSize) } +// Returns the number of frames in the segment, derived either from the +// segment header if the segment is from a previous session, or from the +// framesWritten field otherwise. +func (segment *queueSegment) frameCount() uint32 { + if segment.header != nil { + return segment.header.frameCount + } + return segment.framesWritten +} + // A helper function that returns the number of frames in an existing // segment file, used during startup to count how many events are // pending in the queue from a previous session. diff --git a/libbeat/publisher/queue/diskqueue/state_file.go b/libbeat/publisher/queue/diskqueue/state_file.go index d8cbb5690ac2..59e062093dcb 100644 --- a/libbeat/publisher/queue/diskqueue/state_file.go +++ b/libbeat/publisher/queue/diskqueue/state_file.go @@ -24,6 +24,8 @@ import ( "os" ) +const currentStateFileVersion = 1 + // Given an open file handle to the queue state, decode the current position // and return the result if successful, otherwise an error. func queuePositionFromHandle( @@ -40,7 +42,7 @@ func queuePositionFromHandle( if err != nil { return queuePosition{}, err } - if version != 0 { + if version > currentStateFileVersion { return queuePosition{}, fmt.Errorf("Unsupported queue metadata version (%d)", version) } @@ -57,6 +59,14 @@ func queuePositionFromHandle( return queuePosition{}, err } + // frameIndex field was added in schema version 1 + if version >= 1 { + err = binary.Read( + reader, binary.LittleEndian, &position.frameIndex) + if err != nil { + return queuePosition{}, err + } + } return position, nil } @@ -82,7 +92,8 @@ func writeQueuePositionToHandle( } // Want to write: version (0), segment id, segment offset. - err = binary.Write(file, binary.LittleEndian, uint32(0)) + err = binary.Write( + file, binary.LittleEndian, uint32(currentStateFileVersion)) if err != nil { return err } @@ -91,5 +102,9 @@ func writeQueuePositionToHandle( return err } err = binary.Write(file, binary.LittleEndian, position.offset) + if err != nil { + return err + } + err = binary.Write(file, binary.LittleEndian, position.frameIndex) return err }