Skip to content

Commit

Permalink
Update go-txfile and dependencies (elastic#7859)
Browse files Browse the repository at this point in the history
* Update go-txfile

* Update golang.org/x/sys

go-txfile requires a newer version of golang.org/x/sys

* Update spool to take API changes into account

* Ensure spool opening root cause is included when printing the error message

* cleanup vendor after govendor fails

* Update notice file

* Update changelog

* Fix build

* Update go-txfile to 0.0.3

(cherry picked from commit 10f41be)
  • Loading branch information
Steffen Siering committed Aug 8, 2018
1 parent e9a3d87 commit f383960
Show file tree
Hide file tree
Showing 218 changed files with 28,800 additions and 7,864 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ https://github.com/elastic/beats/compare/v6.4.0...6.x[Check the HEAD diff]
*Affecting all Beats*

- Fixed `add_host_metadata` not initializing correctly on Windows. {issue}7715[7715]
- Fixed missing file unlock in spool file on Windows, so file can be reopened and locked. {pull}7859[7859]
- Fix spool file opening/creation failing due to file locking on Windows. {pull}7859[7859]
- Fix size of maximum mmaped read area in spool file on Windows. {pull}7859[7859]
- Fix potential data loss on OS X in spool file by using fcntl with F_FULLFSYNC. {pull}7859[7859]
- Improve fsync on linux, by assuming the kernel resets error flags of failed writes. {pull}7859[7859]
- Remove unix-like permission checks on Windows, so files can be opened. {issue}7849[7849]

*Auditbeat*

Expand All @@ -52,6 +58,7 @@ https://github.com/elastic/beats/compare/v6.4.0...6.x[Check the HEAD diff]

- Add backoff on error support to redis output. {pull}7781[7781]
- Allow for cloud-id to specify a custom port. This makes cloud-id work in ECE contexts. {pull}7887[7887]
- Add support to grow or shrink an existing spool file between restarts. {pull}7859[7859]

*Auditbeat*

Expand Down
6 changes: 3 additions & 3 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -436,8 +436,8 @@ Elasticsearch, B.V. (https://www.elastic.co/).

--------------------------------------------------------------------
Dependency: github.com/elastic/go-txfile
Version: v0.0.1
Revision: 7e7e33cc236f30fff545f3ee2c35ada5b70b6b13
Version: v0.0.3
Revision: 389b527ad365f6fc6cf5fa7e0ba5a2294ad2f3ed
License type (autodetected): Apache-2.0
./vendor/github.com/elastic/go-txfile/LICENSE:
--------------------------------------------------------------------
Expand Down Expand Up @@ -2574,7 +2574,7 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

--------------------------------------------------------------------
Dependency: golang.org/x/sys
Revision: 37707fdb30a5b38865cfb95e5aab41707daec7fd
Revision: 0ffbfd41fbef8ffcf9b62b0b0aa3a5873ed7a4fe
License type (autodetected): BSD-3-Clause
./vendor/golang.org/x/sys/LICENSE:
--------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) {
debugf("Initializing output plugins")
pipeline, err := pipeline.Load(b.Info, reg, b.Config.Pipeline, b.Config.Output)
if err != nil {
return nil, fmt.Errorf("error initializing publisher: %v", err)
return nil, fmt.Errorf("error initializing publisher: %+v", err)
}

// TODO: some beats race on shutdown with publisher.Stop -> do not call Stop yet,
Expand Down
2 changes: 1 addition & 1 deletion libbeat/publisher/pipeline/stress/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func RunTests(
// reg := monitoring.NewRegistry()
pipeline, err := pipeline.Load(info, nil, config.Pipeline, config.Output)
if err != nil {
return fmt.Errorf("loading pipeline failed: %v", err)
return fmt.Errorf("loading pipeline failed: %+v", err)
}
defer func() {
logp.Info("Stop pipeline")
Expand Down
36 changes: 22 additions & 14 deletions libbeat/publisher/queue/spool/inbroker.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type inBroker struct {

// queue state
queue *pq.Queue
writer *pq.Writer
clientStates clientStates

// Event contents, that still needs to be send to the queue. An event is
Expand Down Expand Up @@ -83,6 +84,11 @@ func newInBroker(
return nil, err
}

writer, err := qu.Writer()
if err != nil {
return nil, err
}

b := &inBroker{
ctx: ctx,
eventer: eventer,
Expand All @@ -97,6 +103,7 @@ func newInBroker(

// queue state
queue: qu,
writer: writer,
clientStates: clientStates{},
pending: nil,
bufferedEvents: 0,
Expand Down Expand Up @@ -214,30 +221,32 @@ func (b *inBroker) eventLoop() {

// notify ackLoop to stop only after eventLoop has finished (after last flush)
defer close(b.ackDone)
defer b.eventloopShutdown()

for {
ok := b.state(b)
if !ok {
break
}
}
}

func (b *inBroker) eventloopShutdown() {
// try to flush events/buffers on shutdown.
if b.bufferedEvents == 0 {
return
}

// try to append pending events
// Try to flush pending events.
w := b.writer
for len(b.pending) > 0 {
n, err := b.queue.Writer().Write(b.pending)
n, err := w.Write(b.pending)
b.pending = b.pending[n:]
if err != nil {
return
}
}

// final flush
b.queue.Writer().Flush()
w.Flush()
}

// stateEmpty is the brokers active state if the write buffer is empty and the
Expand Down Expand Up @@ -442,7 +451,7 @@ func (b *inBroker) stateBlocked() bool {
b.pending = nil
err := b.writeEvent(tmp)
if err != nil || len(b.pending) > 0 {
log.Debug("writing pending event failed: ", err)
log.Debugf("writing pending event failed: %+v", err)
break
}
}
Expand Down Expand Up @@ -503,7 +512,7 @@ func (b *inBroker) addEvent(buf []byte, st clientState) error {
log.Debug(" add event -> active:", count)

err := b.writeEvent(buf)
log.Debug(" inbroker write ->", err, b.bufferedEvents)
log.Debugf(" inbroker write -> events=%v, err=%+v ", b.bufferedEvents, err)

return err
}
Expand All @@ -512,28 +521,27 @@ func (b *inBroker) writeEvent(buf []byte) error {
log := b.ctx.logger

// append event to queue
queueWriter := b.queue.Writer()
n, err := queueWriter.Write(buf)
w := b.writer
n, err := w.Write(buf)
buf = buf[n:]
if len(buf) > 0 {
b.pending = buf
} else if err == nil {
log.Debug("writer: finalize event in buffer")
err = queueWriter.Next()
err = w.Next()
}

if err != nil {
log := b.ctx.logger
log.Debugf("appending event content to write buffer failed with %v", err)
log.Debugf("Appending event content to write buffer failed with %+v", err)
}
return err
}

func (b *inBroker) flushBuffer() error {
err := b.queue.Writer().Flush()
err := b.writer.Flush()
if err != nil {
log := b.ctx.logger
log.Debugf("spool flush failed with: %v", err)
log.Errorf("Spool flush failed with: %+v", err)
}
return err
}
21 changes: 18 additions & 3 deletions libbeat/publisher/queue/spool/outbroker.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type outBroker struct {

// queue state
queue *pq.Queue
reader *pq.Reader
available uint // number of available events. getRequests are only accepted if available > 0
events []publisher.Event
required int
Expand Down Expand Up @@ -81,6 +82,12 @@ var ackChanPool = sync.Pool{
var errRetry = errors.New("retry")

func newOutBroker(ctx *spoolCtx, qu *pq.Queue, flushTimeout time.Duration) (*outBroker, error) {
reader := qu.Reader()
avail, err := reader.Available()
if err != nil {
return nil, err
}

b := &outBroker{
ctx: ctx,
state: nil,
Expand All @@ -96,7 +103,8 @@ func newOutBroker(ctx *spoolCtx, qu *pq.Queue, flushTimeout time.Duration) (*out

// queue state
queue: qu,
available: qu.Reader().Available(),
reader: reader,
available: avail,
events: nil,
required: 0,
total: 0,
Expand Down Expand Up @@ -157,7 +165,7 @@ func (b *outBroker) ackLoop() {
log.Debugf("receive ACK of %v events\n", ackCh.total)
err := b.queue.ACK(uint(ackCh.total))
if err != nil {
log.Debug("ack failed with:", err)
log.Debugf("ack failed with: %+v", err)
time.Sleep(1 * time.Second)
continue
}
Expand Down Expand Up @@ -424,7 +432,14 @@ func (b *outBroker) collectEvents(
N int,
) ([]publisher.Event, int, error) {
log := b.ctx.logger
reader := b.queue.Reader()
reader := b.reader

// ensure all read operations happen within same transaction
err := reader.Begin()
if err != nil {
return nil, 0, err
}
defer reader.Done()

count := 0
for N > 0 {
Expand Down
7 changes: 4 additions & 3 deletions libbeat/publisher/queue/spool/spool.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package spool
import (
"fmt"
"os"
"runtime"
"sync"
"time"

Expand Down Expand Up @@ -92,14 +93,14 @@ func NewSpool(logger logger, path string, settings Settings) (*Spool, error) {
if !os.IsNotExist(err) {
return nil, err
}
} else {
} else if runtime.GOOS != "windows" {
perm := info.Mode().Perm()
cfgPerm := settings.Mode.Perm()

// check if file has permissions set, that must not be set via config
if (perm | cfgPerm) != cfgPerm {
return nil, fmt.Errorf("file permissions must be more strict (required permissions: %v, actual permissions: %v)",
cfgPerm, perm)
return nil, fmt.Errorf("file permissions for '%v' must be more strict (required permissions: %v, actual permissions: %v)",
path, cfgPerm, perm)
}
}

Expand Down
47 changes: 47 additions & 0 deletions vendor/github.com/elastic/go-txfile/CHANGELOG.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit f383960

Please sign in to comment.