diff --git a/libbeat/publisher/queue/diskqueue/config.go b/libbeat/publisher/queue/diskqueue/config.go index f39f608361de..6a165a665db2 100644 --- a/libbeat/publisher/queue/diskqueue/config.go +++ b/libbeat/publisher/queue/diskqueue/config.go @@ -106,8 +106,8 @@ func DefaultSettings() Settings { MaxSegmentSize: 100 * (1 << 20), // 100MiB MaxBufferSize: (1 << 30), // 1GiB - ReadAheadLimit: 256, - WriteAheadLimit: 1024, + ReadAheadLimit: 512, + WriteAheadLimit: 2048, } } @@ -129,6 +129,14 @@ func SettingsForUserConfig(config *common.Config) (Settings, error) { // divided by 10. settings.MaxSegmentSize = uint64(userConfig.MaxSize) / 10 } + + if userConfig.ReadAheadLimit != nil { + settings.ReadAheadLimit = *userConfig.ReadAheadLimit + } + if userConfig.WriteAheadLimit != nil { + settings.WriteAheadLimit = *userConfig.WriteAheadLimit + } + return settings, nil } diff --git a/libbeat/publisher/queue/diskqueue/core_loop.go b/libbeat/publisher/queue/diskqueue/core_loop.go index 56a50b5a4225..638d9da2f40b 100644 --- a/libbeat/publisher/queue/diskqueue/core_loop.go +++ b/libbeat/publisher/queue/diskqueue/core_loop.go @@ -58,10 +58,15 @@ func (dq *diskQueue) run() { // The writer loop completed a request, so check if there is more // data to be sent. dq.maybeWritePending() - // We also check whether the reader loop is waiting for the data - // that was just written. + + // The data that was just written is now available for reading, so check + // if we should start a new read request. dq.maybeReadPending() + // pendingFrames should now be empty. If any producers were blocked + // because pendingFrames hit settings.WriteAheadLimit, wake them up. + dq.maybeUnblockProducers() + // Reader loop handling case readerLoopResponse := <-dq.readerLoop.responseChan: dq.handleReaderLoopResponse(readerLoopResponse) @@ -417,22 +422,25 @@ func (dq *diskQueue) enqueueWriteFrame(frame *writeFrame) { }) } -// canAcceptFrameOfSize checks whether there is enough free space in the -// queue (subject to settings.MaxBufferSize) to accept a new frame with -// the given size. Size includes both the serialized data and the frame -// header / footer; the easy way to do this for a writeFrame is to pass +// canAcceptFrameOfSize checks whether there is enough free space in the queue +// (subject to settings.{MaxBufferSize, WriteAheadLimit}) to accept a new +// frame with the given size. Size includes both the serialized data and the +// frame header / footer; the easy way to do this for a writeFrame is to pass // in frame.sizeOnDisk(). // Capacity calculations do not include requests in the blockedProducers // list (that data is owned by its callers and we can't touch it until // we are ready to respond). That allows this helper to be used both while // handling producer requests and while deciding whether to unblock // producers after free capacity increases. -// If we decide to add limits on how many events / bytes can be stored -// in pendingFrames (to avoid unbounded memory use if the input is faster -// than the disk), this is the function to modify. func (dq *diskQueue) canAcceptFrameOfSize(frameSize uint64) bool { + // If pendingFrames is already at the WriteAheadLimit, we can't accept + // any new frames right now. + if len(dq.pendingFrames) >= dq.settings.WriteAheadLimit { + return false + } + + // If the queue size is unbounded (max == 0), we accept. if dq.settings.MaxBufferSize == 0 { - // Currently we impose no limitations if the queue size is unbounded. return true }