Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[docker plugin] Refactor pipeline reader for data safety #14375

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5415,15 +5415,6 @@ CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

--------------------------------------------------------------------
Dependency: github.com/tonistiigi/fifo
Revision: bda0ff6ed73c67bfb5e62bc9c697f146b7fd7f13
License type (autodetected): Apache-2.0
./x-pack/dockerlogbeat/vendor/github.com/tonistiigi/fifo/LICENSE:
--------------------------------------------------------------------
Apache License 2.0


--------------------------------------------------------------------
Dependency: github.com/tsg/gopacket
Revision: 7c5392a5f2b5c5fa393c71e1f9064e22b5408995
Expand Down
3 changes: 2 additions & 1 deletion x-pack/dockerlogbeat/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"path/filepath"

"github.com/docker/engine/daemon/logger"

"github.com/elastic/beats/libbeat/publisher/pipeline"
"github.com/elastic/beats/x-pack/dockerlogbeat/pipelinemanager"

Expand Down Expand Up @@ -54,7 +55,7 @@ func startLoggingHandler(pm *pipelinemanager.PipelineManager) func(w http.Respon
return
}

go cl.ConsumeAndSendLogs()
go cl.ConsumePipelineAndSend()

respondOK(w)
} // end func
Expand Down
3 changes: 2 additions & 1 deletion x-pack/dockerlogbeat/magefile.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ package main
import (
"os"

"github.com/elastic/beats/dev-tools/mage"
"github.com/magefile/mage/mg"
"github.com/magefile/mage/sh"

"github.com/elastic/beats/dev-tools/mage"

"github.com/pkg/errors"
)

Expand Down
1 change: 1 addition & 0 deletions x-pack/dockerlogbeat/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os"

"github.com/docker/go-plugins-helpers/sdk"

"github.com/elastic/beats/libbeat/common"
logpcfg "github.com/elastic/beats/libbeat/logp/configure"
_ "github.com/elastic/beats/libbeat/outputs/console"
Expand Down
44 changes: 14 additions & 30 deletions x-pack/dockerlogbeat/pipelinemanager/clientLogReader.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,32 +5,28 @@
package pipelinemanager

import (
"context"
"encoding/binary"
"fmt"
"io"
"os"
"strings"
"syscall"
"time"

"github.com/docker/engine/api/types/plugins/logdriver"
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/publisher/pipeline"
pb "github.com/gogo/protobuf/io"
"github.com/pkg/errors"
"github.com/tonistiigi/fifo"
"github.com/elastic/beats/x-pack/dockerlogbeat/pipereader"
)

// ClientLogger is an instance of a pipeline logger client meant for reading from a single log stream
// There's a many-to-one relationship between clients and pipelines.
// Each container with the same config will get its own client to the same pipeline.
type ClientLogger struct {
logFile io.ReadWriteCloser
logFile *pipereader.PipeReader
client beat.Client
pipelineHash string
closer chan struct{}
}

// newClientFromPipeline creates a new Client logger with a FIFO reader and beat client
Expand All @@ -49,52 +45,40 @@ func newClientFromPipeline(pipeline *pipeline.Pipeline, file, hashstring string)
}

// Create the FIFO reader client from the FIPO pipe
inputFile, err := fifo.OpenFifo(context.Background(), file, syscall.O_RDONLY, 0700)
inputFile, err := pipereader.NewReaderFromPath(file)
if err != nil {
return nil, errors.Wrapf(err, "error opening logger fifo: %q", file)
}
logp.Info("Created new logger for %s", file)

return &ClientLogger{logFile: inputFile, client: client, pipelineHash: hashstring}, nil
return &ClientLogger{logFile: inputFile, client: client, pipelineHash: hashstring, closer: make(chan struct{})}, nil
}

// Close closes the pipeline client and reader
func (cl *ClientLogger) Close() error {
logp.Info("Closing ClientLogger")
cl.client.Close()
cl.logFile.Close()
return cl.client.Close()

return cl.logFile.Close()
}

// ConsumeAndSendLogs reads from the FIFO file and sends to the pipeline client. This will block and should be called in its own goroutine
// TODO: Publish() can block, which is a problem. This whole thing should be two goroutines.
func (cl *ClientLogger) ConsumeAndSendLogs() {
reader := pb.NewUint32DelimitedReader(cl.logFile, binary.BigEndian, 2e6)

// ConsumePipelineAndSend consumes events from the FIFO pipe and sends them to the pipeline client
func (cl *ClientLogger) ConsumePipelineAndSend() {
publishWriter := make(chan logdriver.LogEntry, 500)

go cl.publishLoop(publishWriter)
// Clean up the reader after we're done
defer func() {

close(publishWriter)

err := reader.Close()
if err != nil {
fmt.Fprintf(os.Stderr, "Error closing FIFO reader: %s", err)
}
}()

var log logdriver.LogEntry
for {
err := reader.ReadMsg(&log)
err := cl.logFile.ReadMessage(&log)
if err != nil {
if err == io.EOF || err == os.ErrClosed || strings.Contains(err.Error(), "file already closed") {
cl.logFile.Close()
return
}
// I am...not sure why we do this
reader = pb.NewUint32DelimitedReader(cl.logFile, binary.BigEndian, 2e6)
fmt.Fprintf(os.Stderr, "Error getting message: %s\n", err)
return
}
publishWriter <- log
log.Reset()
Expand Down
5 changes: 3 additions & 2 deletions x-pack/dockerlogbeat/pipelinemanager/libbeattools.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,16 @@ import (
"fmt"
"sort"

"github.com/pkg/errors"
yaml "gopkg.in/yaml.v2"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/idxmgmt"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/publisher/pipeline"
"github.com/elastic/beats/libbeat/publisher/processing"
"github.com/pkg/errors"
yaml "gopkg.in/yaml.v2"
)

// makeConfigHash is the helper function that turns a user config into a hash
Expand Down
3 changes: 2 additions & 1 deletion x-pack/dockerlogbeat/pipelinemanager/pipelineManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ import (
"fmt"
"sync"

"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/publisher/pipeline"
"github.com/pkg/errors"
)

// containerConfig is the common.Config unpacking type
Expand Down
114 changes: 114 additions & 0 deletions x-pack/dockerlogbeat/pipereader/reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package pipereader

import (
"context"
"encoding/binary"
"io"
"syscall"

"github.com/containerd/fifo"
"github.com/docker/engine/api/types/plugins/logdriver"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
)

// PipeReader reads from the FIFO pipe we get from the docker container
type PipeReader struct {
fifoPipe io.ReadCloser
byteOrder binary.ByteOrder
lenFrameBuf []byte
bodyBuf []byte
maxSize int
}

// NewReaderFromPath creates a new FIFO pipe reader from a docker log pipe location
func NewReaderFromPath(file string) (*PipeReader, error) {
inputFile, err := fifo.OpenFifo(context.Background(), file, syscall.O_RDONLY, 0700)
if err != nil {
return nil, errors.Wrapf(err, "error opening logger fifo: %q", file)
}

return &PipeReader{fifoPipe: inputFile, byteOrder: binary.BigEndian, lenFrameBuf: make([]byte, 4), bodyBuf: nil, maxSize: 2e6}, nil
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider calling NewReaderFromReadCloser

}
fearful-symmetry marked this conversation as resolved.
Show resolved Hide resolved

// NewReaderFromReadCloser creates a new FIFO pipe reader from an existing ReadCloser
func NewReaderFromReadCloser(pipe io.ReadCloser) (*PipeReader, error) {
return &PipeReader{fifoPipe: pipe, byteOrder: binary.BigEndian, lenFrameBuf: make([]byte, 4), bodyBuf: nil, maxSize: 2e6}, nil
}

// ReadMessage reads a log message from the pipe
// The message stream consists of a 4-byte length frame and a message body
// There's three logical paths for this code to take:
// 1) If length <0, we have bad data, and we cycle through the frames until we get a valid length.
// 2) If length is valid but larger than the max buffer size, we disregard length bytes and continue
// 3) If length is valid and we can consume everything into the buffer, continue.
func (reader *PipeReader) ReadMessage(log *logdriver.LogEntry) error {
// loop until we're at a valid state and ready to read a message body
var lenFrame int
var err error
for {
lenFrame, err = reader.getValidLengthFrame()
if err != nil {
return errors.Wrap(err, "error getting length frame")
}
if lenFrame <= reader.maxSize {
break
}

// 2) we have a too-large message. Disregard length bytes
_, err = io.CopyBuffer(nil, io.LimitReader(reader.fifoPipe, int64(lenFrame)), reader.bodyBuf)
if err != nil {
return errors.Wrap(err, "error emptying buffer")
}
}

//proceed with 3)
readBuf := reader.setBuffer(lenFrame)
_, err = io.ReadFull(reader.fifoPipe, readBuf[:lenFrame])
if err != nil {
return errors.Wrap(err, "error reading buffer")
}
return proto.Unmarshal(readBuf[:lenFrame], log)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: given len(readBuf) == lenFrame we don't need readBuf[:lenFrame].


}

// setBuffer checks the needed size, and returns a buffer, allocating a new buffer if needed
func (reader *PipeReader) setBuffer(sz int) []byte {
const minSz = 1024
const maxSz = minSz * 32

// return only the portion of the buffer we need
if len(reader.bodyBuf) >= sz {
return reader.bodyBuf[:sz]
}

// if we have an abnormally large buffer, don't set it to bodyBuf so GC can clean it up
if sz > maxSz {
return make([]byte, sz)
}

reader.bodyBuf = make([]byte, sz)
return reader.bodyBuf
}
fearful-symmetry marked this conversation as resolved.
Show resolved Hide resolved

// getValidLengthFrame scrolls through the buffer until we get a valid length
func (reader *PipeReader) getValidLengthFrame() (int, error) {
for {
if _, err := io.ReadFull(reader.fifoPipe, reader.lenFrameBuf); err != nil {
return 0, err
}
bodyLen := int(reader.byteOrder.Uint32(reader.lenFrameBuf))
if bodyLen > 0 {
return bodyLen, nil
}
}
}

// Close closes the reader and underlying pipe
func (reader *PipeReader) Close() error {
return reader.fifoPipe.Close()
}
Loading