Skip to content

Commit

Permalink
[docker plugin] Refactor pipeline reader for data safety (#14375)
Browse files Browse the repository at this point in the history
* refactor pipeline reader for data safety
  • Loading branch information
fearful-symmetry authored Nov 13, 2019
1 parent 6c25ff1 commit 4cc73ac
Show file tree
Hide file tree
Showing 23 changed files with 138 additions and 1,365 deletions.
9 changes: 0 additions & 9 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5415,15 +5415,6 @@ CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

--------------------------------------------------------------------
Dependency: github.com/tonistiigi/fifo
Revision: bda0ff6ed73c67bfb5e62bc9c697f146b7fd7f13
License type (autodetected): Apache-2.0
./x-pack/dockerlogbeat/vendor/github.com/tonistiigi/fifo/LICENSE:
--------------------------------------------------------------------
Apache License 2.0


--------------------------------------------------------------------
Dependency: github.com/tsg/gopacket
Revision: 7c5392a5f2b5c5fa393c71e1f9064e22b5408995
Expand Down
3 changes: 2 additions & 1 deletion x-pack/dockerlogbeat/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"path/filepath"

"github.com/docker/engine/daemon/logger"

"github.com/elastic/beats/libbeat/publisher/pipeline"
"github.com/elastic/beats/x-pack/dockerlogbeat/pipelinemanager"

Expand Down Expand Up @@ -54,7 +55,7 @@ func startLoggingHandler(pm *pipelinemanager.PipelineManager) func(w http.Respon
return
}

go cl.ConsumeAndSendLogs()
go cl.ConsumePipelineAndSend()

respondOK(w)
} // end func
Expand Down
3 changes: 2 additions & 1 deletion x-pack/dockerlogbeat/magefile.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ package main
import (
"os"

"github.com/elastic/beats/dev-tools/mage"
"github.com/magefile/mage/mg"
"github.com/magefile/mage/sh"

"github.com/elastic/beats/dev-tools/mage"

"github.com/pkg/errors"
)

Expand Down
1 change: 1 addition & 0 deletions x-pack/dockerlogbeat/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os"

"github.com/docker/go-plugins-helpers/sdk"

"github.com/elastic/beats/libbeat/common"
logpcfg "github.com/elastic/beats/libbeat/logp/configure"
_ "github.com/elastic/beats/libbeat/outputs/console"
Expand Down
44 changes: 14 additions & 30 deletions x-pack/dockerlogbeat/pipelinemanager/clientLogReader.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,32 +5,28 @@
package pipelinemanager

import (
"context"
"encoding/binary"
"fmt"
"io"
"os"
"strings"
"syscall"
"time"

"github.com/docker/engine/api/types/plugins/logdriver"
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/publisher/pipeline"
pb "github.com/gogo/protobuf/io"
"github.com/pkg/errors"
"github.com/tonistiigi/fifo"
"github.com/elastic/beats/x-pack/dockerlogbeat/pipereader"
)

// ClientLogger is an instance of a pipeline logger client meant for reading from a single log stream
// There's a many-to-one relationship between clients and pipelines.
// Each container with the same config will get its own client to the same pipeline.
type ClientLogger struct {
logFile io.ReadWriteCloser
logFile *pipereader.PipeReader
client beat.Client
pipelineHash string
closer chan struct{}
}

// newClientFromPipeline creates a new Client logger with a FIFO reader and beat client
Expand All @@ -49,52 +45,40 @@ func newClientFromPipeline(pipeline *pipeline.Pipeline, file, hashstring string)
}

// Create the FIFO reader client from the FIPO pipe
inputFile, err := fifo.OpenFifo(context.Background(), file, syscall.O_RDONLY, 0700)
inputFile, err := pipereader.NewReaderFromPath(file)
if err != nil {
return nil, errors.Wrapf(err, "error opening logger fifo: %q", file)
}
logp.Info("Created new logger for %s", file)

return &ClientLogger{logFile: inputFile, client: client, pipelineHash: hashstring}, nil
return &ClientLogger{logFile: inputFile, client: client, pipelineHash: hashstring, closer: make(chan struct{})}, nil
}

// Close closes the pipeline client and reader
func (cl *ClientLogger) Close() error {
logp.Info("Closing ClientLogger")
cl.client.Close()
cl.logFile.Close()
return cl.client.Close()

return cl.logFile.Close()
}

// ConsumeAndSendLogs reads from the FIFO file and sends to the pipeline client. This will block and should be called in its own goroutine
// TODO: Publish() can block, which is a problem. This whole thing should be two goroutines.
func (cl *ClientLogger) ConsumeAndSendLogs() {
reader := pb.NewUint32DelimitedReader(cl.logFile, binary.BigEndian, 2e6)

// ConsumePipelineAndSend consumes events from the FIFO pipe and sends them to the pipeline client
func (cl *ClientLogger) ConsumePipelineAndSend() {
publishWriter := make(chan logdriver.LogEntry, 500)

go cl.publishLoop(publishWriter)
// Clean up the reader after we're done
defer func() {

close(publishWriter)

err := reader.Close()
if err != nil {
fmt.Fprintf(os.Stderr, "Error closing FIFO reader: %s", err)
}
}()

var log logdriver.LogEntry
for {
err := reader.ReadMsg(&log)
err := cl.logFile.ReadMessage(&log)
if err != nil {
if err == io.EOF || err == os.ErrClosed || strings.Contains(err.Error(), "file already closed") {
cl.logFile.Close()
return
}
// I am...not sure why we do this
reader = pb.NewUint32DelimitedReader(cl.logFile, binary.BigEndian, 2e6)
fmt.Fprintf(os.Stderr, "Error getting message: %s\n", err)
return
}
publishWriter <- log
log.Reset()
Expand Down
5 changes: 3 additions & 2 deletions x-pack/dockerlogbeat/pipelinemanager/libbeattools.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,16 @@ import (
"fmt"
"sort"

"github.com/pkg/errors"
yaml "gopkg.in/yaml.v2"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/idxmgmt"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/publisher/pipeline"
"github.com/elastic/beats/libbeat/publisher/processing"
"github.com/pkg/errors"
yaml "gopkg.in/yaml.v2"
)

// makeConfigHash is the helper function that turns a user config into a hash
Expand Down
3 changes: 2 additions & 1 deletion x-pack/dockerlogbeat/pipelinemanager/pipelineManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ import (
"fmt"
"sync"

"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/publisher/pipeline"
"github.com/pkg/errors"
)

// containerConfig is the common.Config unpacking type
Expand Down
114 changes: 114 additions & 0 deletions x-pack/dockerlogbeat/pipereader/reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package pipereader

import (
"context"
"encoding/binary"
"io"
"syscall"

"github.com/containerd/fifo"
"github.com/docker/engine/api/types/plugins/logdriver"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
)

// PipeReader reads from the FIFO pipe we get from the docker container
type PipeReader struct {
fifoPipe io.ReadCloser
byteOrder binary.ByteOrder
lenFrameBuf []byte
bodyBuf []byte
maxSize int
}

// NewReaderFromPath creates a new FIFO pipe reader from a docker log pipe location
func NewReaderFromPath(file string) (*PipeReader, error) {
inputFile, err := fifo.OpenFifo(context.Background(), file, syscall.O_RDONLY, 0700)
if err != nil {
return nil, errors.Wrapf(err, "error opening logger fifo: %q", file)
}

return &PipeReader{fifoPipe: inputFile, byteOrder: binary.BigEndian, lenFrameBuf: make([]byte, 4), bodyBuf: nil, maxSize: 2e6}, nil
}

// NewReaderFromReadCloser creates a new FIFO pipe reader from an existing ReadCloser
func NewReaderFromReadCloser(pipe io.ReadCloser) (*PipeReader, error) {
return &PipeReader{fifoPipe: pipe, byteOrder: binary.BigEndian, lenFrameBuf: make([]byte, 4), bodyBuf: nil, maxSize: 2e6}, nil
}

// ReadMessage reads a log message from the pipe
// The message stream consists of a 4-byte length frame and a message body
// There's three logical paths for this code to take:
// 1) If length <0, we have bad data, and we cycle through the frames until we get a valid length.
// 2) If length is valid but larger than the max buffer size, we disregard length bytes and continue
// 3) If length is valid and we can consume everything into the buffer, continue.
func (reader *PipeReader) ReadMessage(log *logdriver.LogEntry) error {
// loop until we're at a valid state and ready to read a message body
var lenFrame int
var err error
for {
lenFrame, err = reader.getValidLengthFrame()
if err != nil {
return errors.Wrap(err, "error getting length frame")
}
if lenFrame <= reader.maxSize {
break
}

// 2) we have a too-large message. Disregard length bytes
_, err = io.CopyBuffer(nil, io.LimitReader(reader.fifoPipe, int64(lenFrame)), reader.bodyBuf)
if err != nil {
return errors.Wrap(err, "error emptying buffer")
}
}

//proceed with 3)
readBuf := reader.setBuffer(lenFrame)
_, err = io.ReadFull(reader.fifoPipe, readBuf[:lenFrame])
if err != nil {
return errors.Wrap(err, "error reading buffer")
}
return proto.Unmarshal(readBuf[:lenFrame], log)

}

// setBuffer checks the needed size, and returns a buffer, allocating a new buffer if needed
func (reader *PipeReader) setBuffer(sz int) []byte {
const minSz = 1024
const maxSz = minSz * 32

// return only the portion of the buffer we need
if len(reader.bodyBuf) >= sz {
return reader.bodyBuf[:sz]
}

// if we have an abnormally large buffer, don't set it to bodyBuf so GC can clean it up
if sz > maxSz {
return make([]byte, sz)
}

reader.bodyBuf = make([]byte, sz)
return reader.bodyBuf
}

// getValidLengthFrame scrolls through the buffer until we get a valid length
func (reader *PipeReader) getValidLengthFrame() (int, error) {
for {
if _, err := io.ReadFull(reader.fifoPipe, reader.lenFrameBuf); err != nil {
return 0, err
}
bodyLen := int(reader.byteOrder.Uint32(reader.lenFrameBuf))
if bodyLen > 0 {
return bodyLen, nil
}
}
}

// Close closes the reader and underlying pipe
func (reader *PipeReader) Close() error {
return reader.fifoPipe.Close()
}
Loading

0 comments on commit 4cc73ac

Please sign in to comment.