Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick #7859 to 6.4: Update go-txfile and dependencies #7911

Merged
merged 1 commit into from
Aug 8, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ https://github.com/elastic/beats/compare/v6.4.0...6.4[Check the HEAD diff]

- Fixed `add_host_metadata` not initializing correctly on Windows. {issue}7715[7715]
- Add backoff on error support to redis output. {pull}7781[7781]
- Fixed missing file unlock in spool file on Windows, so file can be reopened and locked. {pull}7859[7859]
- Fix spool file opening/creation failing due to file locking on Windows. {pull}7859[7859]
- Fix size of maximum mmaped read area in spool file on Windows. {pull}7859[7859]
- Fix potential data loss on OS X in spool file by using fcntl with F_FULLFSYNC. {pull}7859[7859]
- Improve fsync on linux, by assuming the kernel resets error flags of failed writes. {pull}7859[7859]
- Remove unix-like permission checks on Windows, so files can be opened. {issue}7849[7849]

*Auditbeat*

Expand All @@ -53,6 +59,7 @@ https://github.com/elastic/beats/compare/v6.4.0...6.4[Check the HEAD diff]
*Affecting all Beats*

- Allow for cloud-id to specify a custom port. This makes cloud-id work in ECE contexts. {pull}7887[7887]
- Add support to grow or shrink an existing spool file between restarts. {pull}7859[7859]

*Auditbeat*

Expand Down
6 changes: 3 additions & 3 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -436,8 +436,8 @@ Elasticsearch, B.V. (https://www.elastic.co/).

--------------------------------------------------------------------
Dependency: github.com/elastic/go-txfile
Version: v0.0.1
Revision: 7e7e33cc236f30fff545f3ee2c35ada5b70b6b13
Version: v0.0.3
Revision: 389b527ad365f6fc6cf5fa7e0ba5a2294ad2f3ed
License type (autodetected): Apache-2.0
./vendor/github.com/elastic/go-txfile/LICENSE:
--------------------------------------------------------------------
Expand Down Expand Up @@ -2574,7 +2574,7 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

--------------------------------------------------------------------
Dependency: golang.org/x/sys
Revision: 37707fdb30a5b38865cfb95e5aab41707daec7fd
Revision: 0ffbfd41fbef8ffcf9b62b0b0aa3a5873ed7a4fe
License type (autodetected): BSD-3-Clause
./vendor/golang.org/x/sys/LICENSE:
--------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) {
debugf("Initializing output plugins")
pipeline, err := pipeline.Load(b.Info, reg, b.Config.Pipeline, b.Config.Output)
if err != nil {
return nil, fmt.Errorf("error initializing publisher: %v", err)
return nil, fmt.Errorf("error initializing publisher: %+v", err)
}

// TODO: some beats race on shutdown with publisher.Stop -> do not call Stop yet,
Expand Down
2 changes: 1 addition & 1 deletion libbeat/publisher/pipeline/stress/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func RunTests(
// reg := monitoring.NewRegistry()
pipeline, err := pipeline.Load(info, nil, config.Pipeline, config.Output)
if err != nil {
return fmt.Errorf("loading pipeline failed: %v", err)
return fmt.Errorf("loading pipeline failed: %+v", err)
}
defer func() {
logp.Info("Stop pipeline")
Expand Down
36 changes: 22 additions & 14 deletions libbeat/publisher/queue/spool/inbroker.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type inBroker struct {

// queue state
queue *pq.Queue
writer *pq.Writer
clientStates clientStates

// Event contents, that still needs to be send to the queue. An event is
Expand Down Expand Up @@ -83,6 +84,11 @@ func newInBroker(
return nil, err
}

writer, err := qu.Writer()
if err != nil {
return nil, err
}

b := &inBroker{
ctx: ctx,
eventer: eventer,
Expand All @@ -97,6 +103,7 @@ func newInBroker(

// queue state
queue: qu,
writer: writer,
clientStates: clientStates{},
pending: nil,
bufferedEvents: 0,
Expand Down Expand Up @@ -214,30 +221,32 @@ func (b *inBroker) eventLoop() {

// notify ackLoop to stop only after eventLoop has finished (after last flush)
defer close(b.ackDone)
defer b.eventloopShutdown()

for {
ok := b.state(b)
if !ok {
break
}
}
}

func (b *inBroker) eventloopShutdown() {
// try to flush events/buffers on shutdown.
if b.bufferedEvents == 0 {
return
}

// try to append pending events
// Try to flush pending events.
w := b.writer
for len(b.pending) > 0 {
n, err := b.queue.Writer().Write(b.pending)
n, err := w.Write(b.pending)
b.pending = b.pending[n:]
if err != nil {
return
}
}

// final flush
b.queue.Writer().Flush()
w.Flush()
}

// stateEmpty is the brokers active state if the write buffer is empty and the
Expand Down Expand Up @@ -442,7 +451,7 @@ func (b *inBroker) stateBlocked() bool {
b.pending = nil
err := b.writeEvent(tmp)
if err != nil || len(b.pending) > 0 {
log.Debug("writing pending event failed: ", err)
log.Debugf("writing pending event failed: %+v", err)
break
}
}
Expand Down Expand Up @@ -503,7 +512,7 @@ func (b *inBroker) addEvent(buf []byte, st clientState) error {
log.Debug(" add event -> active:", count)

err := b.writeEvent(buf)
log.Debug(" inbroker write ->", err, b.bufferedEvents)
log.Debugf(" inbroker write -> events=%v, err=%+v ", b.bufferedEvents, err)

return err
}
Expand All @@ -512,28 +521,27 @@ func (b *inBroker) writeEvent(buf []byte) error {
log := b.ctx.logger

// append event to queue
queueWriter := b.queue.Writer()
n, err := queueWriter.Write(buf)
w := b.writer
n, err := w.Write(buf)
buf = buf[n:]
if len(buf) > 0 {
b.pending = buf
} else if err == nil {
log.Debug("writer: finalize event in buffer")
err = queueWriter.Next()
err = w.Next()
}

if err != nil {
log := b.ctx.logger
log.Debugf("appending event content to write buffer failed with %v", err)
log.Debugf("Appending event content to write buffer failed with %+v", err)
}
return err
}

func (b *inBroker) flushBuffer() error {
err := b.queue.Writer().Flush()
err := b.writer.Flush()
if err != nil {
log := b.ctx.logger
log.Debugf("spool flush failed with: %v", err)
log.Errorf("Spool flush failed with: %+v", err)
}
return err
}
21 changes: 18 additions & 3 deletions libbeat/publisher/queue/spool/outbroker.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type outBroker struct {

// queue state
queue *pq.Queue
reader *pq.Reader
available uint // number of available events. getRequests are only accepted if available > 0
events []publisher.Event
required int
Expand Down Expand Up @@ -81,6 +82,12 @@ var ackChanPool = sync.Pool{
var errRetry = errors.New("retry")

func newOutBroker(ctx *spoolCtx, qu *pq.Queue, flushTimeout time.Duration) (*outBroker, error) {
reader := qu.Reader()
avail, err := reader.Available()
if err != nil {
return nil, err
}

b := &outBroker{
ctx: ctx,
state: nil,
Expand All @@ -96,7 +103,8 @@ func newOutBroker(ctx *spoolCtx, qu *pq.Queue, flushTimeout time.Duration) (*out

// queue state
queue: qu,
available: qu.Reader().Available(),
reader: reader,
available: avail,
events: nil,
required: 0,
total: 0,
Expand Down Expand Up @@ -157,7 +165,7 @@ func (b *outBroker) ackLoop() {
log.Debugf("receive ACK of %v events\n", ackCh.total)
err := b.queue.ACK(uint(ackCh.total))
if err != nil {
log.Debug("ack failed with:", err)
log.Debugf("ack failed with: %+v", err)
time.Sleep(1 * time.Second)
continue
}
Expand Down Expand Up @@ -424,7 +432,14 @@ func (b *outBroker) collectEvents(
N int,
) ([]publisher.Event, int, error) {
log := b.ctx.logger
reader := b.queue.Reader()
reader := b.reader

// ensure all read operations happen within same transaction
err := reader.Begin()
if err != nil {
return nil, 0, err
}
defer reader.Done()

count := 0
for N > 0 {
Expand Down
7 changes: 4 additions & 3 deletions libbeat/publisher/queue/spool/spool.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package spool
import (
"fmt"
"os"
"runtime"
"sync"
"time"

Expand Down Expand Up @@ -92,14 +93,14 @@ func NewSpool(logger logger, path string, settings Settings) (*Spool, error) {
if !os.IsNotExist(err) {
return nil, err
}
} else {
} else if runtime.GOOS != "windows" {
perm := info.Mode().Perm()
cfgPerm := settings.Mode.Perm()

// check if file has permissions set, that must not be set via config
if (perm | cfgPerm) != cfgPerm {
return nil, fmt.Errorf("file permissions must be more strict (required permissions: %v, actual permissions: %v)",
cfgPerm, perm)
return nil, fmt.Errorf("file permissions for '%v' must be more strict (required permissions: %v, actual permissions: %v)",
path, cfgPerm, perm)
}
}

Expand Down
47 changes: 47 additions & 0 deletions vendor/github.com/elastic/go-txfile/CHANGELOG.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading