Skip to content

Commit

Permalink
expand test suite
Browse files Browse the repository at this point in the history
  • Loading branch information
fearful-symmetry committed Nov 27, 2019
1 parent 5121aa6 commit da26d09
Show file tree
Hide file tree
Showing 6 changed files with 199 additions and 35 deletions.
13 changes: 3 additions & 10 deletions x-pack/dockerlogbeat/pipelinemanager/clientLogReader.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,10 @@ import (

"github.com/docker/docker/api/types/plugins/logdriver"
"github.com/docker/docker/daemon/logger"
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/publisher/pipeline"
"github.com/elastic/beats/x-pack/dockerlogbeat/pipereader"
)

Expand All @@ -33,27 +31,22 @@ type ClientLogger struct {
}

// newClientFromPipeline creates a new Client logger with a FIFO reader and beat client
func newClientFromPipeline(pipeline *pipeline.Pipeline, file, hashstring string, info logger.Info) (*ClientLogger, error) {
func newClientFromPipeline(pipeline beat.PipelineConnector, inputFile *pipereader.PipeReader, hashstring string, info logger.Info) (*ClientLogger, error) {
// setup the beat client
settings := beat.ClientConfig{
WaitClose: 0,
}
clientLogger := logp.NewLogger("clientLogReader")
settings.ACKCount = func(n int) {
clientLogger.Debugf("Pipeline client (%s) ACKS; %v", file, n)
clientLogger.Debugf("Pipeline client ACKS; %v", n)
}
settings.PublishMode = beat.DefaultGuarantees
client, err := pipeline.ConnectWith(settings)
if err != nil {
return nil, err
}

// Create the FIFO reader client from the FIPO pipe
inputFile, err := pipereader.NewReaderFromPath(file)
if err != nil {
return nil, errors.Wrapf(err, "error opening logger fifo: %q", file)
}
clientLogger.Debugf("Created new logger for %s", file)
clientLogger.Debugf("Created new logger for %s", hashstring)

return &ClientLogger{logFile: inputFile, client: client, pipelineHash: hashstring, closer: make(chan struct{}), containerMeta: info, logger: clientLogger}, nil
}
Expand Down
55 changes: 55 additions & 0 deletions x-pack/dockerlogbeat/pipelinemanager/clientLogReader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package pipelinemanager

import (
"testing"

"github.com/docker/docker/daemon/logger"
"github.com/stretchr/testify/assert"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/x-pack/dockerlogbeat/pipelinemock"
"github.com/elastic/beats/x-pack/dockerlogbeat/pipereader"
)

func TestNewClient(t *testing.T) {
mockConnector := &pipelinemock.MockPipelineConnector{}
client := createNewClient(t, mockConnector)
// ConsumePipelineAndSent is what does the actual reading and sending.
// After we spawn this goroutine, we wait until we get something back
go client.ConsumePipelineAndSend()
event := testReturnAndClose(t, mockConnector, client)
assert.Equal(t, event.Fields["message"], "This is a log line")

}

func createNewClient(t *testing.T, mockConnector *pipelinemock.MockPipelineConnector) *ClientLogger {
// an example container metadata struct
cfgObject := logger.Info{
Config: map[string]string{"output.elasticsearch": "localhost:9200"},
ContainerLabels: map[string]string{"test.label": "test"},
ContainerID: "3acc92989a97c415905eba090277b8a8834d087e58a95bed55450338ce0758dd",
ContainerName: "testContainer",
ContainerImageName: "TestImage",
}

// create a new pipeline reader for use with the libbeat client
reader, err := pipereader.NewReaderFromReadCloser(pipelinemock.CreateTestInput(t))
assert.NoError(t, err)

client, err := newClientFromPipeline(mockConnector, reader, "aaa", cfgObject)
assert.NoError(t, err)

return client
}

func testReturnAndClose(t *testing.T, conn *pipelinemock.MockPipelineConnector, client *ClientLogger) beat.Event {
defer client.Close()
for {
// wait until we get our example event back
if events := conn.GetAllEvents(); len(events) > 0 {
assert.NotEmpty(t, events)
return events[0]
}
}

}
9 changes: 8 additions & 1 deletion x-pack/dockerlogbeat/pipelinemanager/pipelineManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"fmt"
"sync"

"github.com/elastic/beats/x-pack/dockerlogbeat/pipereader"

"github.com/docker/docker/daemon/logger"
"github.com/pkg/errors"

Expand Down Expand Up @@ -80,8 +82,13 @@ func (pm *PipelineManager) CreateClientWithConfig(containerConfig logger.Info, f
return nil, errors.Wrap(err, "error getting pipeline")
}

pipeRead, err := pipereader.NewReaderFromPath(file)
if err != nil {
return nil, errors.Wrap(err, "")
}

//actually get to crafting the new client.
cl, err := newClientFromPipeline(pipeline.pipeline, file, hashstring, containerConfig)
cl, err := newClientFromPipeline(pipeline.pipeline, pipeRead, hashstring, containerConfig)
if err != nil {
return nil, errors.Wrap(err, "error creating client")
}
Expand Down
89 changes: 89 additions & 0 deletions x-pack/dockerlogbeat/pipelinemock/pipelines.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package pipelinemock

import (
"fmt"
"sync"

"github.com/elastic/beats/libbeat/beat"
)

// MockBeatClient mocks the Client interface
type MockBeatClient struct {
publishes []beat.Event
closed bool
mtx sync.Mutex
}

// GetEvents returns the published events
func (c *MockBeatClient) GetEvents() []beat.Event {
c.mtx.Lock()
defer c.mtx.Unlock()

return c.publishes
}

// Publish mocks the Client Publish method
func (c *MockBeatClient) Publish(e beat.Event) {
c.PublishAll([]beat.Event{e})
}

// PublishAll mocks the Client PublishAll method
func (c *MockBeatClient) PublishAll(events []beat.Event) {
c.mtx.Lock()
defer c.mtx.Unlock()

for _, e := range events {
c.publishes = append(c.publishes, e)
}
}

// Close mocks the Client Close method
func (c *MockBeatClient) Close() error {
c.mtx.Lock()
defer c.mtx.Unlock()

if c.closed {
return fmt.Errorf("mock client already closed")
}

c.closed = true
return nil
}

// MockPipelineConnector mocks the PipelineConnector interface
type MockPipelineConnector struct {
clients []*MockBeatClient
mtx sync.Mutex
}

// GetAllEvents returns all events associated with a pipeline
func (pc *MockPipelineConnector) GetAllEvents() []beat.Event {

evList := []beat.Event{}
for _, clientEvents := range pc.clients {
evList = append(evList, clientEvents.GetEvents()...)
}

return evList
}

// Connect mocks the PipelineConnector Connect method
func (pc *MockPipelineConnector) Connect() (beat.Client, error) {
return pc.ConnectWith(beat.ClientConfig{})
}

// ConnectWith mocks the PipelineConnector ConnectWith method
func (pc *MockPipelineConnector) ConnectWith(beat.ClientConfig) (beat.Client, error) {
pc.mtx.Lock()
defer pc.mtx.Unlock()

c := &MockBeatClient{}

pc.clients = append(pc.clients, c)

return c, nil
}
42 changes: 42 additions & 0 deletions x-pack/dockerlogbeat/pipelinemock/reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package pipelinemock

import (
"bytes"
"encoding/binary"
"io"
"io/ioutil"
"testing"

"github.com/docker/docker/api/types/plugins/logdriver"
"github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/assert"
)

// CreateTestInput creates a mocked ReadCloser for the pipelineReader
func CreateTestInput(t *testing.T) io.ReadCloser {
//setup
exampleStruct := &logdriver.LogEntry{
Source: "Test",
TimeNano: 0,
Line: []byte("This is a log line"),
Partial: false,
PartialLogMetadata: &logdriver.PartialLogEntryMetadata{
Last: false,
Id: "",
Ordinal: 0,
},
}

rawBytes, err := proto.Marshal(exampleStruct)
assert.NoError(t, err)

sizeBytes := make([]byte, 4)
binary.BigEndian.PutUint32(sizeBytes, uint32(len(rawBytes)))
rawBytes = append(sizeBytes, rawBytes...)
rc := ioutil.NopCloser(bytes.NewReader(rawBytes))
return rc
}
26 changes: 2 additions & 24 deletions x-pack/dockerlogbeat/pipereader/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,41 +5,19 @@
package pipereader

import (
"bytes"
"encoding/binary"
"io/ioutil"
"testing"

"github.com/stretchr/testify/assert"

"github.com/gogo/protobuf/proto"

"github.com/docker/docker/api/types/plugins/logdriver"
)

func TestPipeReader(t *testing.T) {

//setup
exampleStruct := &logdriver.LogEntry{
Source: "Test",
TimeNano: 0,
Line: []byte("This is a log line"),
Partial: false,
PartialLogMetadata: &logdriver.PartialLogEntryMetadata{
Last: false,
Id: "",
Ordinal: 0,
},
}
rawBytes, err := proto.Marshal(exampleStruct)
assert.NoError(t, err)

sizeBytes := make([]byte, 4)
binary.BigEndian.PutUint32(sizeBytes, uint32(len(rawBytes)))
rawBytes = append(sizeBytes, rawBytes...)
rawBytes := pipeineMock.CreateTestInput(t)

// actual test
pipeRead, err := NewReaderFromReadCloser(ioutil.NopCloser(bytes.NewReader(rawBytes)))
pipeRead, err := NewReaderFromReadCloser(rawBytes)
assert.NoError(t, err)
var outLog logdriver.LogEntry
err = pipeRead.ReadMessage(&outLog)
Expand Down

0 comments on commit da26d09

Please sign in to comment.