Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[docker plugin] Refactor pipeline reader for data safety #14375

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion x-pack/dockerlogbeat/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"path/filepath"

"github.com/docker/engine/daemon/logger"

"github.com/elastic/beats/libbeat/publisher/pipeline"
"github.com/elastic/beats/x-pack/dockerlogbeat/pipelinemanager"

Expand Down Expand Up @@ -54,7 +55,7 @@ func startLoggingHandler(pm *pipelinemanager.PipelineManager) func(w http.Respon
return
}

go cl.ConsumeAndSendLogs()
go cl.ConsumePipelineAndSend()

respondOK(w)
} // end func
Expand Down
3 changes: 2 additions & 1 deletion x-pack/dockerlogbeat/magefile.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ package main
import (
"os"

"github.com/elastic/beats/dev-tools/mage"
"github.com/magefile/mage/mg"
"github.com/magefile/mage/sh"

"github.com/elastic/beats/dev-tools/mage"

"github.com/pkg/errors"
)

Expand Down
1 change: 1 addition & 0 deletions x-pack/dockerlogbeat/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os"

"github.com/docker/go-plugins-helpers/sdk"

"github.com/elastic/beats/libbeat/common"
logpcfg "github.com/elastic/beats/libbeat/logp/configure"
_ "github.com/elastic/beats/libbeat/outputs/console"
Expand Down
75 changes: 50 additions & 25 deletions x-pack/dockerlogbeat/pipelinemanager/clientLogReader.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,32 +5,28 @@
package pipelinemanager

import (
"context"
"encoding/binary"
"fmt"
"io"
"os"
"strings"
"syscall"
"time"

"github.com/docker/engine/api/types/plugins/logdriver"
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/publisher/pipeline"
pb "github.com/gogo/protobuf/io"
"github.com/pkg/errors"
"github.com/tonistiigi/fifo"
"github.com/elastic/beats/x-pack/dockerlogbeat/pipereader"
)

// ClientLogger is an instance of a pipeline logger client meant for reading from a single log stream
// There's a many-to-one relationship between clients and pipelines.
// Each container with the same config will get its own client to the same pipeline.
type ClientLogger struct {
logFile io.ReadWriteCloser
logFile *pipereader.PipeReader
client beat.Client
pipelineHash string
closer chan struct{}
}

// newClientFromPipeline creates a new Client logger with a FIFO reader and beat client
Expand All @@ -49,28 +45,61 @@ func newClientFromPipeline(pipeline *pipeline.Pipeline, file, hashstring string)
}

// Create the FIFO reader client from the FIPO pipe
inputFile, err := fifo.OpenFifo(context.Background(), file, syscall.O_RDONLY, 0700)
inputFile, err := pipereader.NewReaderFromPath(file)
if err != nil {
return nil, errors.Wrapf(err, "error opening logger fifo: %q", file)
}
logp.Info("Created new logger for %s", file)

return &ClientLogger{logFile: inputFile, client: client, pipelineHash: hashstring}, nil
return &ClientLogger{logFile: inputFile, client: client, pipelineHash: hashstring, closer: make(chan struct{})}, nil
}

// Close closes the pipeline client and reader
func (cl *ClientLogger) Close() error {
logp.Info("Closing ClientLogger")
cl.client.Close()
cl.logFile.Close()
return cl.client.Close()

return cl.logFile.Close()
}

// ConsumeAndSendLogs reads from the FIFO file and sends to the pipeline client. This will block and should be called in its own goroutine
// TODO: Publish() can block, which is a problem. This whole thing should be two goroutines.
func (cl *ClientLogger) ConsumeAndSendLogs() {
reader := pb.NewUint32DelimitedReader(cl.logFile, binary.BigEndian, 2e6)

// func (cl *ClientLogger) ConsumeAndSendLogs() {
fearful-symmetry marked this conversation as resolved.
Show resolved Hide resolved
// reader := pb.NewUint32DelimitedReader(cl.logFile, binary.BigEndian, 2e6)

// publishWriter := make(chan logdriver.LogEntry, 500)

// go cl.publishLoop(publishWriter)
// // Clean up the reader after we're done
// defer func() {

// close(publishWriter)

// err := reader.Close()
// if err != nil {
// fmt.Fprintf(os.Stderr, "Error closing FIFO reader: %s", err)
// }
// }()

// var log logdriver.LogEntry
// for {
// err := reader.ReadMsg(&log)
// if err != nil {
// if err == io.EOF || err == os.ErrClosed || strings.Contains(err.Error(), "file already closed") {
// cl.logFile.Close()
// return
// }
// // I am...not sure why we do this
// reader = pb.NewUint32DelimitedReader(cl.logFile, binary.BigEndian, 2e6)
// }
// publishWriter <- log
// log.Reset()

// }
// }

// ConsumePipelineAndSend consumes events from the FIFO pipe and sends them to the pipeline client
func (cl *ClientLogger) ConsumePipelineAndSend() {
publishWriter := make(chan logdriver.LogEntry, 500)

go cl.publishLoop(publishWriter)
Expand All @@ -79,22 +108,18 @@ func (cl *ClientLogger) ConsumeAndSendLogs() {

close(publishWriter)

err := reader.Close()
err := cl.logFile.Close()
if err != nil {
fmt.Fprintf(os.Stderr, "Error closing FIFO reader: %s", err)
fmt.Fprintf(os.Stderr, "Error closing FIFO reader: %s\n", err)
}
}()

var log logdriver.LogEntry
for {
err := reader.ReadMsg(&log)
err := cl.logFile.ReadMessage(log)
if err != nil {
if err == io.EOF || err == os.ErrClosed || strings.Contains(err.Error(), "file already closed") {
cl.logFile.Close()
return
}
// I am...not sure why we do this
reader = pb.NewUint32DelimitedReader(cl.logFile, binary.BigEndian, 2e6)
fmt.Fprintf(os.Stderr, "Error getting message: %s\n", err)
return
}
publishWriter <- log
log.Reset()
Expand Down
5 changes: 3 additions & 2 deletions x-pack/dockerlogbeat/pipelinemanager/libbeattools.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,16 @@ import (
"fmt"
"sort"

"github.com/pkg/errors"
yaml "gopkg.in/yaml.v2"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/idxmgmt"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/publisher/pipeline"
"github.com/elastic/beats/libbeat/publisher/processing"
"github.com/pkg/errors"
yaml "gopkg.in/yaml.v2"
)

// makeConfigHash is the helper function that turns a user config into a hash
Expand Down
3 changes: 2 additions & 1 deletion x-pack/dockerlogbeat/pipelinemanager/pipelineManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ import (
"fmt"
"sync"

"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/publisher/pipeline"
"github.com/pkg/errors"
)

// containerConfig is the common.Config unpacking type
Expand Down
101 changes: 101 additions & 0 deletions x-pack/dockerlogbeat/pipereader/reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package pipereader

import (
"context"
"encoding/binary"
"io"
"syscall"

"github.com/docker/engine/api/types/plugins/logdriver"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
"github.com/tonistiigi/fifo"
fearful-symmetry marked this conversation as resolved.
Show resolved Hide resolved
)

// PipeReader reads from the FIFO pipe we get from the docker container
type PipeReader struct {
fifoPipe io.ReadWriteCloser
fearful-symmetry marked this conversation as resolved.
Show resolved Hide resolved
byteOrder binary.ByteOrder
lenFrameBuf []byte
bodyBuf []byte
maxSize int
}

// NewReaderFromPath creates a new FIFO pipe reader from a docker log pipe location
func NewReaderFromPath(file string) (*PipeReader, error) {
inputFile, err := fifo.OpenFifo(context.Background(), file, syscall.O_RDONLY, 0700)
if err != nil {
return nil, errors.Wrapf(err, "error opening logger fifo: %q", file)
}

return &PipeReader{fifoPipe: inputFile, byteOrder: binary.BigEndian, lenFrameBuf: make([]byte, 4), bodyBuf: nil, maxSize: 2e6}, nil
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider calling NewReaderFromReadCloser

}
fearful-symmetry marked this conversation as resolved.
Show resolved Hide resolved

// ReadMessage reads a log message from the pipe
// The message stream consists of a 4-byte length frame and a message body
// There's three logical paths for this code to take:
// 1) If length <0, we have bad data, and we cycle through the frames until we get a valid length.
// 2) If length is valid but larger than the max buffer size, we disregard length bytes and continue
// 3) If length is valid and we can consume everything into the buffer, continue.
func (reader *PipeReader) ReadMessage(log logdriver.LogEntry) error {
// loop until we're at a valid state and ready to read a message body
var lenFrame int
var err error
for {
lenFrame, err = reader.getValidLengthFrame()
if err != nil {
return errors.Wrap(err, "error getting length frame")
}

if lenFrame <= reader.maxSize {
break
}

// 2) we have a too-large message. Disregard length bytes
_, err = io.CopyN(nil, reader.fifoPipe, int64(lenFrame))
fearful-symmetry marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return errors.Wrap(err, "error emptying buffer")
}
}

//proceed with 3)
reader.bodyBuf = make([]byte, lenFrame)
fearful-symmetry marked this conversation as resolved.
Show resolved Hide resolved
_, err = io.ReadFull(reader.fifoPipe, reader.bodyBuf[:lenFrame])
if err != nil {
return errors.Wrap(err, "error reading buffer")
}
return proto.Unmarshal(reader.bodyBuf[:lenFrame], &log)

}

// Close closes the reader and underlying pipe
func (reader *PipeReader) Close() error {
return reader.fifoPipe.Close()
}
fearful-symmetry marked this conversation as resolved.
Show resolved Hide resolved

// getValidLengthFrame guarentees that we return a valid length field
func (reader *PipeReader) getValidLengthFrame() (int, error) {
if _, err := io.ReadFull(reader.fifoPipe, reader.lenFrameBuf); err != nil {
return 0, err
}
bodyLen := int(reader.byteOrder.Uint32(reader.lenFrameBuf))
// 1). Invalid Length.
if bodyLen < 0 {
//TODO: should we have some kind of 'timeout' or reporting here?
for {
if _, err := io.ReadFull(reader.fifoPipe, reader.lenFrameBuf); err != nil {
return 0, err
}
bodyLen = int(reader.byteOrder.Uint32(reader.lenFrameBuf))
if bodyLen > 0 {
break
}
}
} // end of len check
fearful-symmetry marked this conversation as resolved.
Show resolved Hide resolved

return bodyLen, nil
}