Skip to content

Commit

Permalink
track frame index within segments
Browse files Browse the repository at this point in the history
  • Loading branch information
faec committed Dec 7, 2020
1 parent ddd29bd commit 2012b5e
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 53 deletions.
18 changes: 12 additions & 6 deletions libbeat/publisher/queue/diskqueue/acks.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,14 @@ import (
)

// queuePosition represents a logical position within the queue buffer.
// The frame index is logically redundant with the segment offset, but
// calculating it requires a linear scan of the file so we store both values.
// Note that frameIndex is relative to the beginning of the segment,
// and has no relation to the sequence used by frameID.
type queuePosition struct {
segmentID segmentID
offset segmentOffset
segmentID segmentID
offset segmentOffset
frameIndex uint64
}

type diskQueueACKs struct {
Expand Down Expand Up @@ -114,15 +119,16 @@ func (dqa *diskQueueACKs) addFrames(frames []*readFrame) {
newSegment, ok := dqa.segmentBoundaries[dqa.nextFrameID]
if ok {
// This is the start of a new segment. Remove this frame from the
// segment boundary list and set the position to the start of the
// new segment.
// segment boundary list and reset the next position.
delete(dqa.segmentBoundaries, dqa.nextFrameID)
dqa.nextPosition = queuePosition{
segmentID: newSegment,
offset: 0,
segmentID: newSegment,
offset: 0,
frameIndex: 0,
}
}
dqa.nextPosition.offset += segmentOffset(dqa.frameSize[dqa.nextFrameID])
dqa.nextPosition.frameIndex++
delete(dqa.frameSize, dqa.nextFrameID)
}
// We advanced the ACK position at least somewhat, so write its
Expand Down
3 changes: 1 addition & 2 deletions libbeat/publisher/queue/diskqueue/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,7 @@ func (settings Settings) segmentPath(segmentID segmentID) string {
}

func (settings Settings) maxSegmentOffset() segmentOffset {
return segmentOffset(
int(settings.MaxSegmentSize) - segmentHeaderSize)
return segmentOffset(settings.MaxSegmentSize - segmentHeaderSize)
}

// Given a retry interval, nextRetryInterval returns the next higher level
Expand Down
7 changes: 3 additions & 4 deletions libbeat/publisher/queue/diskqueue/core_loop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func TestHandleProducerWriteRequest(t *testing.T) {
}

settings := DefaultSettings()
settings.MaxSegmentSize = uint64(1000 + segmentHeaderSize)
settings.MaxSegmentSize = 1000 + segmentHeaderSize
settings.MaxBufferSize = 10000
for description, test := range testCases {
dq := &diskQueue{
Expand Down Expand Up @@ -973,12 +973,11 @@ func makeWriteFrameWithSize(size int) *writeFrame {
}

func segmentWithSize(size int) *queueSegment {
headerSize := segmentHeaderSize
if size < headerSize {
if size < segmentHeaderSize {
// Can't have a segment smaller than the segment header
return nil
}
return &queueSegment{endOffset: segmentOffset(size - headerSize)}
return &queueSegment{endOffset: segmentOffset(size - segmentHeaderSize)}
}

func equalReaderLoopRequests(
Expand Down
22 changes: 22 additions & 0 deletions libbeat/publisher/queue/diskqueue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,14 @@ func NewQueue(logger *logp.Logger, settings Settings) (queue.Queue, error) {
// warning and fall back on the oldest existing segment, if any.
logger.Warnf("Couldn't load most recent queue position: %v", err)
}
if nextReadPosition.frameIndex == 0 {
// If the previous state was written by an older version, it may lack
// the frameIndex field. In this case we reset the read offset within
// the segment, which may cause one-time retransmission of some events
// from the previous session, but ensures that
// our metrics are consistent.
nextReadPosition.offset = 0
}
positionFile, err := os.OpenFile(
settings.stateFilePath(), os.O_WRONLY|os.O_CREATE, 0600)
if err != nil {
Expand Down Expand Up @@ -178,6 +186,20 @@ func NewQueue(logger *logp.Logger, settings Settings) (queue.Queue, error) {
nextReadPosition = queuePosition{segmentID: initialSegments[0].id}
}

// We can compute the active frames right now but still need a way to report
// them to the global beat metrics. For now, just log the total.
// Note that for consistency with existing queue behavior, this excludes
// events that are still present on disk but were already sent and
// acknowledged on a previous run (we probably want to track these as well
// in the future.)
// TODO: pass in a context that queues can use to report these events.
activeFrameCount := 0
for _, segment := range initialSegments {
activeFrameCount += int(segment.frameCount())
}
activeFrameCount -= int(nextReadPosition.frameIndex)
logger.Infof("Found %d existing events on queue start", activeFrameCount)

queue := &diskQueue{
logger: logger,
settings: settings,
Expand Down
49 changes: 10 additions & 39 deletions libbeat/publisher/queue/diskqueue/segments.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,6 @@ func scanExistingSegments(logger *logp.Logger, pathStr string) ([]*queueSegment,

segments := []*queueSegment{}
for _, file := range files {
/*if file.Size() <= segmentHeaderSize {
// Ignore segments that don't have at least some data beyond the
// header (this will always be true of segments we write unless there
// is an error).
continue
}*/
components := strings.Split(file.Name(), ".")
if len(components) == 2 && strings.ToLower(components[1]) == "seg" {
// Parse the id as base-10 64-bit unsigned int. We ignore file names that
Expand All @@ -204,39 +198,6 @@ func scanExistingSegments(logger *logp.Logger, pathStr string) ([]*queueSegment,
header: header,
endOffset: segmentOffset(file.Size() - int64(header.sizeOnDisk())),
})

//newSegment, err := prescanSegment(logger, segmentID(id), fullPath)

// If a segment is returned with an error, this means we were able to
// read at least some data but the end of the file may be incomplete
// or corrupted. In this case we add it to our list and read as much of
// it as we can.
/*if newSegment == nil {
logger.Errorf("couldn't load segment file '%v': %v", fullPath, err)
} else {
if err != nil {
logger.Warnf(
"error loading segment file '%v', data may be incomplete: %v",
fullPath, err)
}
segments = append(segments, newSegment)
}*/
/*frameCount, err := readFrameCount(path.Join(pathStr, file.Name()))
if frameCount == 0 {
logger.Errorf("")
} else {
if err != nil {
// If there is an error but frameCount is still positive, it means
// we
logger.Warnf(
"Error")
}
segments = append(segments, &queueSegment{
id: segmentID(id),
endOffset: segmentOffset(file.Size() - segmentHeaderSize),
framesWritten: frameCount,
})
}*/
}
}
}
Expand All @@ -254,6 +215,16 @@ func (segment *queueSegment) sizeOnDisk() uint64 {
return uint64(segment.endOffset) + uint64(headerSize)
}

// Returns the number of frames in the segment, derived either from the
// segment header if the segment is from a previous session, or from the
// framesWritten field otherwise.
func (segment *queueSegment) frameCount() uint32 {
if segment.header != nil {
return segment.header.frameCount
}
return segment.framesWritten
}

// A helper function that returns the number of frames in an existing
// segment file, used during startup to count how many events are
// pending in the queue from a previous session.
Expand Down
19 changes: 17 additions & 2 deletions libbeat/publisher/queue/diskqueue/state_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"os"
)

const currentStateFileVersion = 1

// Given an open file handle to the queue state, decode the current position
// and return the result if successful, otherwise an error.
func queuePositionFromHandle(
Expand All @@ -40,7 +42,7 @@ func queuePositionFromHandle(
if err != nil {
return queuePosition{}, err
}
if version != 0 {
if version > currentStateFileVersion {
return queuePosition{},
fmt.Errorf("Unsupported queue metadata version (%d)", version)
}
Expand All @@ -57,6 +59,14 @@ func queuePositionFromHandle(
return queuePosition{}, err
}

// frameIndex field was added in schema version 1
if version >= 1 {
err = binary.Read(
reader, binary.LittleEndian, &position.frameIndex)
if err != nil {
return queuePosition{}, err
}
}
return position, nil
}

Expand All @@ -82,7 +92,8 @@ func writeQueuePositionToHandle(
}

// Want to write: version (0), segment id, segment offset.
err = binary.Write(file, binary.LittleEndian, uint32(0))
err = binary.Write(
file, binary.LittleEndian, uint32(currentStateFileVersion))
if err != nil {
return err
}
Expand All @@ -91,5 +102,9 @@ func writeQueuePositionToHandle(
return err
}
err = binary.Write(file, binary.LittleEndian, position.offset)
if err != nil {
return err
}
err = binary.Write(file, binary.LittleEndian, position.frameIndex)
return err
}

0 comments on commit 2012b5e

Please sign in to comment.