diff --git a/x-pack/dockerlogbeat/pipelinemanager/clientLogReader.go b/x-pack/dockerlogbeat/pipelinemanager/clientLogReader.go index 5225636df75..fb2388c80a0 100644 --- a/x-pack/dockerlogbeat/pipelinemanager/clientLogReader.go +++ b/x-pack/dockerlogbeat/pipelinemanager/clientLogReader.go @@ -11,12 +11,10 @@ import ( "github.com/docker/docker/api/types/plugins/logdriver" "github.com/docker/docker/daemon/logger" - "github.com/pkg/errors" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/libbeat/publisher/pipeline" "github.com/elastic/beats/x-pack/dockerlogbeat/pipereader" ) @@ -33,14 +31,14 @@ type ClientLogger struct { } // newClientFromPipeline creates a new Client logger with a FIFO reader and beat client -func newClientFromPipeline(pipeline *pipeline.Pipeline, file, hashstring string, info logger.Info) (*ClientLogger, error) { +func newClientFromPipeline(pipeline beat.PipelineConnector, inputFile *pipereader.PipeReader, hashstring string, info logger.Info) (*ClientLogger, error) { // setup the beat client settings := beat.ClientConfig{ WaitClose: 0, } clientLogger := logp.NewLogger("clientLogReader") settings.ACKCount = func(n int) { - clientLogger.Debugf("Pipeline client (%s) ACKS; %v", file, n) + clientLogger.Debugf("Pipeline client ACKS; %v", n) } settings.PublishMode = beat.DefaultGuarantees client, err := pipeline.ConnectWith(settings) @@ -48,12 +46,7 @@ func newClientFromPipeline(pipeline *pipeline.Pipeline, file, hashstring string, return nil, err } - // Create the FIFO reader client from the FIPO pipe - inputFile, err := pipereader.NewReaderFromPath(file) - if err != nil { - return nil, errors.Wrapf(err, "error opening logger fifo: %q", file) - } - clientLogger.Debugf("Created new logger for %s", file) + clientLogger.Debugf("Created new logger for %s", hashstring) return &ClientLogger{logFile: inputFile, client: client, pipelineHash: hashstring, closer: make(chan struct{}), containerMeta: info, logger: clientLogger}, nil } diff --git a/x-pack/dockerlogbeat/pipelinemanager/clientLogReader_test.go b/x-pack/dockerlogbeat/pipelinemanager/clientLogReader_test.go new file mode 100644 index 00000000000..0eb6d66373b --- /dev/null +++ b/x-pack/dockerlogbeat/pipelinemanager/clientLogReader_test.go @@ -0,0 +1,55 @@ +package pipelinemanager + +import ( + "testing" + + "github.com/docker/docker/daemon/logger" + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/x-pack/dockerlogbeat/pipelinemock" + "github.com/elastic/beats/x-pack/dockerlogbeat/pipereader" +) + +func TestNewClient(t *testing.T) { + mockConnector := &pipelinemock.MockPipelineConnector{} + client := createNewClient(t, mockConnector) + // ConsumePipelineAndSent is what does the actual reading and sending. + // After we spawn this goroutine, we wait until we get something back + go client.ConsumePipelineAndSend() + event := testReturnAndClose(t, mockConnector, client) + assert.Equal(t, event.Fields["message"], "This is a log line") + +} + +func createNewClient(t *testing.T, mockConnector *pipelinemock.MockPipelineConnector) *ClientLogger { + // an example container metadata struct + cfgObject := logger.Info{ + Config: map[string]string{"output.elasticsearch": "localhost:9200"}, + ContainerLabels: map[string]string{"test.label": "test"}, + ContainerID: "3acc92989a97c415905eba090277b8a8834d087e58a95bed55450338ce0758dd", + ContainerName: "testContainer", + ContainerImageName: "TestImage", + } + + // create a new pipeline reader for use with the libbeat client + reader, err := pipereader.NewReaderFromReadCloser(pipelinemock.CreateTestInput(t)) + assert.NoError(t, err) + + client, err := newClientFromPipeline(mockConnector, reader, "aaa", cfgObject) + assert.NoError(t, err) + + return client +} + +func testReturnAndClose(t *testing.T, conn *pipelinemock.MockPipelineConnector, client *ClientLogger) beat.Event { + defer client.Close() + for { + // wait until we get our example event back + if events := conn.GetAllEvents(); len(events) > 0 { + assert.NotEmpty(t, events) + return events[0] + } + } + +} diff --git a/x-pack/dockerlogbeat/pipelinemanager/pipelineManager.go b/x-pack/dockerlogbeat/pipelinemanager/pipelineManager.go index d9f9ddd8c63..d979cf8d1da 100644 --- a/x-pack/dockerlogbeat/pipelinemanager/pipelineManager.go +++ b/x-pack/dockerlogbeat/pipelinemanager/pipelineManager.go @@ -8,6 +8,8 @@ import ( "fmt" "sync" + "github.com/elastic/beats/x-pack/dockerlogbeat/pipereader" + "github.com/docker/docker/daemon/logger" "github.com/pkg/errors" @@ -80,8 +82,13 @@ func (pm *PipelineManager) CreateClientWithConfig(containerConfig logger.Info, f return nil, errors.Wrap(err, "error getting pipeline") } + pipeRead, err := pipereader.NewReaderFromPath(file) + if err != nil { + return nil, errors.Wrap(err, "") + } + //actually get to crafting the new client. - cl, err := newClientFromPipeline(pipeline.pipeline, file, hashstring, containerConfig) + cl, err := newClientFromPipeline(pipeline.pipeline, pipeRead, hashstring, containerConfig) if err != nil { return nil, errors.Wrap(err, "error creating client") } diff --git a/x-pack/dockerlogbeat/pipelinemock/pipelines.go b/x-pack/dockerlogbeat/pipelinemock/pipelines.go new file mode 100644 index 00000000000..3d8978773bd --- /dev/null +++ b/x-pack/dockerlogbeat/pipelinemock/pipelines.go @@ -0,0 +1,89 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package pipelinemock + +import ( + "fmt" + "sync" + + "github.com/elastic/beats/libbeat/beat" +) + +// MockBeatClient mocks the Client interface +type MockBeatClient struct { + publishes []beat.Event + closed bool + mtx sync.Mutex +} + +// GetEvents returns the published events +func (c *MockBeatClient) GetEvents() []beat.Event { + c.mtx.Lock() + defer c.mtx.Unlock() + + return c.publishes +} + +// Publish mocks the Client Publish method +func (c *MockBeatClient) Publish(e beat.Event) { + c.PublishAll([]beat.Event{e}) +} + +// PublishAll mocks the Client PublishAll method +func (c *MockBeatClient) PublishAll(events []beat.Event) { + c.mtx.Lock() + defer c.mtx.Unlock() + + for _, e := range events { + c.publishes = append(c.publishes, e) + } +} + +// Close mocks the Client Close method +func (c *MockBeatClient) Close() error { + c.mtx.Lock() + defer c.mtx.Unlock() + + if c.closed { + return fmt.Errorf("mock client already closed") + } + + c.closed = true + return nil +} + +// MockPipelineConnector mocks the PipelineConnector interface +type MockPipelineConnector struct { + clients []*MockBeatClient + mtx sync.Mutex +} + +// GetAllEvents returns all events associated with a pipeline +func (pc *MockPipelineConnector) GetAllEvents() []beat.Event { + + evList := []beat.Event{} + for _, clientEvents := range pc.clients { + evList = append(evList, clientEvents.GetEvents()...) + } + + return evList +} + +// Connect mocks the PipelineConnector Connect method +func (pc *MockPipelineConnector) Connect() (beat.Client, error) { + return pc.ConnectWith(beat.ClientConfig{}) +} + +// ConnectWith mocks the PipelineConnector ConnectWith method +func (pc *MockPipelineConnector) ConnectWith(beat.ClientConfig) (beat.Client, error) { + pc.mtx.Lock() + defer pc.mtx.Unlock() + + c := &MockBeatClient{} + + pc.clients = append(pc.clients, c) + + return c, nil +} diff --git a/x-pack/dockerlogbeat/pipelinemock/reader.go b/x-pack/dockerlogbeat/pipelinemock/reader.go new file mode 100644 index 00000000000..257ac2424df --- /dev/null +++ b/x-pack/dockerlogbeat/pipelinemock/reader.go @@ -0,0 +1,42 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package pipelinemock + +import ( + "bytes" + "encoding/binary" + "io" + "io/ioutil" + "testing" + + "github.com/docker/docker/api/types/plugins/logdriver" + "github.com/gogo/protobuf/proto" + "github.com/stretchr/testify/assert" +) + +// CreateTestInput creates a mocked ReadCloser for the pipelineReader +func CreateTestInput(t *testing.T) io.ReadCloser { + //setup + exampleStruct := &logdriver.LogEntry{ + Source: "Test", + TimeNano: 0, + Line: []byte("This is a log line"), + Partial: false, + PartialLogMetadata: &logdriver.PartialLogEntryMetadata{ + Last: false, + Id: "", + Ordinal: 0, + }, + } + + rawBytes, err := proto.Marshal(exampleStruct) + assert.NoError(t, err) + + sizeBytes := make([]byte, 4) + binary.BigEndian.PutUint32(sizeBytes, uint32(len(rawBytes))) + rawBytes = append(sizeBytes, rawBytes...) + rc := ioutil.NopCloser(bytes.NewReader(rawBytes)) + return rc +} diff --git a/x-pack/dockerlogbeat/pipereader/reader_test.go b/x-pack/dockerlogbeat/pipereader/reader_test.go index 2d41183a89b..024c2c2f21c 100644 --- a/x-pack/dockerlogbeat/pipereader/reader_test.go +++ b/x-pack/dockerlogbeat/pipereader/reader_test.go @@ -5,41 +5,19 @@ package pipereader import ( - "bytes" - "encoding/binary" - "io/ioutil" "testing" "github.com/stretchr/testify/assert" - "github.com/gogo/protobuf/proto" - "github.com/docker/docker/api/types/plugins/logdriver" ) func TestPipeReader(t *testing.T) { - //setup - exampleStruct := &logdriver.LogEntry{ - Source: "Test", - TimeNano: 0, - Line: []byte("This is a log line"), - Partial: false, - PartialLogMetadata: &logdriver.PartialLogEntryMetadata{ - Last: false, - Id: "", - Ordinal: 0, - }, - } - rawBytes, err := proto.Marshal(exampleStruct) - assert.NoError(t, err) - - sizeBytes := make([]byte, 4) - binary.BigEndian.PutUint32(sizeBytes, uint32(len(rawBytes))) - rawBytes = append(sizeBytes, rawBytes...) + rawBytes := pipeineMock.CreateTestInput(t) // actual test - pipeRead, err := NewReaderFromReadCloser(ioutil.NopCloser(bytes.NewReader(rawBytes))) + pipeRead, err := NewReaderFromReadCloser(rawBytes) assert.NoError(t, err) var outLog logdriver.LogEntry err = pipeRead.ReadMessage(&outLog)