Skip to content

Commit

Permalink
Fix ECS fields in Elastic Log Driver, change index prefix (#20522) (#…
Browse files Browse the repository at this point in the history
…20577)

* change index names, clean up code

* update docs

* fix up metadata handling

* fix docs

* add changelog entry

(cherry picked from commit a8f9cc8)
  • Loading branch information
fearful-symmetry authored Aug 17, 2020
1 parent 44103e9 commit 40cb94c
Show file tree
Hide file tree
Showing 9 changed files with 72 additions and 58 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,7 @@ field. You can revert this change by configuring tags for the module and omittin

*Elastic Log Driver*
- Add support for `docker logs` command {pull}19531[19531]
- Add support to change beat name, and support for Kibana Logs. {pull}20522[20522]

==== Deprecated

Expand Down
5 changes: 5 additions & 0 deletions x-pack/dockerlogbeat/docs/configuration.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ indices. For example: +"dockerlogs-%{+yyyy.MM.dd}"+.

3+|*Advanced:*

|`name`
|`testbeat`
| A custom value that will be inserted into the document as `agent.name`.
If not set, it will be the hostname of Docker host.

|`backoff_init`
|`1s`
|The number of seconds to wait before trying to reconnect to {es} after
Expand Down
8 changes: 7 additions & 1 deletion x-pack/dockerlogbeat/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,13 @@ func main() {
if err != nil {
fatal("DESTROY_LOGS_ON_STOP must be 'true' or 'false': %s", err)
}
pipelines := pipelinemanager.NewPipelineManager(logDestroy)

hostname, err := os.Hostname()
if err != nil {
fatal("Error fetching hostname: %s", err)
}

pipelines := pipelinemanager.NewPipelineManager(logDestroy, hostname)

sdkHandler := sdk.NewHandler(`{"Implements": ["LoggingDriver"]}`)
// Create handlers for startup and shutdown of the log driver
Expand Down
52 changes: 36 additions & 16 deletions x-pack/dockerlogbeat/pipelinemanager/clientLogReader.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,20 @@ type ClientLogger struct {
logger *logp.Logger
// ContainerMeta is the metadata object for the container we get from docker
ContainerMeta logger.Info
// ContainerECSMeta is a container metadata object appended to every event
ContainerECSMeta common.MapStr
// logFile is the FIFO reader that reads from the docker container stdio
logFile *pipereader.PipeReader
// client is the libbeat client object that sends logs upstream
client beat.Client
// localLog manages the local JSON logs for containers
localLog logger.Logger
// hostname for event metadata
hostname string
}

// newClientFromPipeline creates a new Client logger with a FIFO reader and beat client
func newClientFromPipeline(pipeline beat.PipelineConnector, inputFile *pipereader.PipeReader, hash uint64, info logger.Info, localLog logger.Logger) (*ClientLogger, error) {
func newClientFromPipeline(pipeline beat.PipelineConnector, inputFile *pipereader.PipeReader, hash uint64, info logger.Info, localLog logger.Logger, hostname string) (*ClientLogger, error) {
// setup the beat client
settings := beat.ClientConfig{
WaitClose: 0,
Expand All @@ -59,11 +63,13 @@ func newClientFromPipeline(pipeline beat.PipelineConnector, inputFile *pipereade
clientLogger.Debugf("Created new logger for %d", hash)

return &ClientLogger{logFile: inputFile,
client: client,
pipelineHash: hash,
ContainerMeta: info,
localLog: localLog,
logger: clientLogger}, nil
client: client,
pipelineHash: hash,
ContainerMeta: info,
ContainerECSMeta: constructECSContainerData(info),
localLog: localLog,
logger: clientLogger,
hostname: hostname}, nil
}

// Close closes the pipeline client and reader
Expand Down Expand Up @@ -100,6 +106,26 @@ func (cl *ClientLogger) ConsumePipelineAndSend() {
}
}

// constructECSContainerData creates an ES-ready MapString object with container metadata.
func constructECSContainerData(metadata logger.Info) common.MapStr {

var containerImageName, containerImageTag string
if idx := strings.IndexRune(metadata.ContainerImageName, ':'); idx >= 0 {
containerImageName = string([]rune(metadata.ContainerImageName)[:idx])
containerImageTag = string([]rune(metadata.ContainerImageName)[idx+1:])
}

return common.MapStr{
"labels": helper.DeDotLabels(metadata.ContainerLabels, true),
"id": metadata.ContainerID,
"name": helper.ExtractContainerName([]string{metadata.ContainerName}),
"image": common.MapStr{
"name": containerImageName,
"tag": containerImageTag,
},
}
}

// publishLoop sits in a loop and waits for events to publish
// Publish() can block if there is an upstream output issue. This is a problem because if the FIFO queues that handle the docker logs fill up, plugins can no longer send logs
// A buffered channel with its own publish gives us a little more wiggle room.
Expand All @@ -117,20 +143,14 @@ func (cl *ClientLogger) publishLoop(reader chan logdriver.LogEntry) {
cl.client.Publish(beat.Event{
Timestamp: time.Unix(0, entry.TimeNano),
Fields: common.MapStr{
"message": line,
"container": common.MapStr{
"labels": helper.DeDotLabels(cl.ContainerMeta.ContainerLabels, true),
"id": cl.ContainerMeta.ContainerID,
"name": helper.ExtractContainerName([]string{cl.ContainerMeta.ContainerName}),
"image": common.MapStr{
"name": cl.ContainerMeta.ContainerImageName,
},
"message": line,
"container": cl.ContainerECSMeta,
"host": common.MapStr{
"name": cl.hostname,
},
},
})

}

}

func constructLogSpoolMsg(line logdriver.LogEntry) *logger.Message {
Expand Down
12 changes: 2 additions & 10 deletions x-pack/dockerlogbeat/pipelinemanager/clientLogReader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,7 @@ func setupTestReader(t *testing.T, logString string, containerConfig logger.Info
}

// createNewClient sets up the "write side" of the pipeline, creating a log event to write and send back into the test.
func createNewClient(t *testing.T, logString string, mockConnector *pipelinemock.MockPipelineConnector, containerConfig logger.Info) *ClientLogger {
// an example container metadata struct
cfgObject := logger.Info{
Config: map[string]string{"output.elasticsearch": "localhost:9200"},
ContainerLabels: map[string]string{"test.label": "test"},
ContainerID: "3acc92989a97c415905eba090277b8a8834d087e58a95bed55450338ce0758dd",
ContainerName: "testContainer",
ContainerImageName: "TestImage",
}
func createNewClient(t *testing.T, logString string, mockConnector *pipelinemock.MockPipelineConnector, cfgObject logger.Info) *ClientLogger {

// create a new pipeline reader for use with the libbeat client
reader, err := pipereader.NewReaderFromReadCloser(pipelinemock.CreateTestInputFromLine(t, logString))
Expand All @@ -100,7 +92,7 @@ func createNewClient(t *testing.T, logString string, mockConnector *pipelinemock
localLog, err := jsonfilelog.New(info)
assert.NoError(t, err)

client, err := newClientFromPipeline(mockConnector, reader, 123, cfgObject, localLog)
client, err := newClientFromPipeline(mockConnector, reader, 123, cfgObject, localLog, "test")
require.NoError(t, err)

return client
Expand Down
2 changes: 2 additions & 0 deletions x-pack/dockerlogbeat/pipelinemanager/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type ContainerOutputConfig struct {
CloudID string `struct:"cloud.id,omitempty"`
CloudAuth string `struct:"cloud.auth,omitempty"`
ProxyURL string `struct:"output.elasticsearch.proxy_url,omitempty"`
BeatName string `struct:"-"`
}

// NewCfgFromRaw returns a ContainerOutputConfig based on a raw config we get from the API
Expand All @@ -53,6 +54,7 @@ func NewCfgFromRaw(input map[string]string) (ContainerOutputConfig, error) {
newCfg.Timeout = input["timeout"]
newCfg.BackoffInit = input["backoff_init"]
newCfg.BackoffMax = input["backoff_max"]
newCfg.BeatName = input["name"]

return newCfg, nil
}
Expand Down
33 changes: 9 additions & 24 deletions x-pack/dockerlogbeat/pipelinemanager/libbeattools.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/cloudid"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/file"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/outputs"
Expand Down Expand Up @@ -49,8 +48,7 @@ func makeConfigHash(cfg map[string]string) string {
}

// load pipeline starts up a new pipeline with the given config
func loadNewPipeline(logOptsConfig ContainerOutputConfig, name string, log *logp.Logger) (*Pipeline, error) {

func loadNewPipeline(logOptsConfig ContainerOutputConfig, hostname string, log *logp.Logger) (*Pipeline, error) {
cfg, err := logOptsConfig.CreateConfig()
if err != nil {
return nil, err
Expand All @@ -68,7 +66,7 @@ func loadNewPipeline(logOptsConfig ContainerOutputConfig, name string, log *logp
return nil, fmt.Errorf("unpacking config failed: %v", err)
}

info, err := getBeatInfo(cfg)
info, err := getBeatInfo(logOptsConfig, hostname)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -131,38 +129,25 @@ func parseCfgKeys(cfg map[string]string) (map[string]interface{}, error) {
}

// getBeatInfo returns the beat.Info type needed to start the pipeline
func getBeatInfo(cfg *common.Config) (beat.Info, error) {
func getBeatInfo(pluginOpts ContainerOutputConfig, hostname string) (beat.Info, error) {
vers := version.GetDefaultVersion()
hostname, err := os.Hostname()
if err != nil {
return beat.Info{}, errors.Wrap(err, "error getting hostname")
}

eid, err := uuid.NewV4()
if err != nil {
return beat.Info{}, errors.Wrap(err, "error creating ephemeral ID")
}

type nameStr struct {
Name string `config:"name"`
}
name := nameStr{}
err = cfg.Unpack(&name)
if err != nil {
return beat.Info{}, fmt.Errorf("unpacking config failed: %v", err)
}

if name.Name == "" {
name.Name = "elastic-log-driver"
}
id, err := loadMeta("/tmp/meta.json")
if err != nil {
return beat.Info{}, errors.Wrap(err, "error loading UUID")
}

beatName := "elastic-log-driver"

info := beat.Info{
Beat: name.Name,
Name: name.Name,
IndexPrefix: name.Name,
Beat: beatName,
Name: pluginOpts.BeatName,
IndexPrefix: "logs-docker",
Hostname: hostname,
Version: vers,
EphemeralID: eid,
Expand Down
13 changes: 8 additions & 5 deletions x-pack/dockerlogbeat/pipelinemanager/pipelineManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,20 @@ type PipelineManager struct {
logDirectory string
// destroyLogsOnStop indicates for the client to remove log files when a container stops
destroyLogsOnStop bool
// hostname of the docker host
hostname string
}

// NewPipelineManager creates a new Pipeline map
func NewPipelineManager(logDestroy bool) *PipelineManager {
func NewPipelineManager(logDestroy bool, hostname string) *PipelineManager {
return &PipelineManager{
Logger: logp.NewLogger("PipelineManager"),
pipelines: make(map[uint64]*Pipeline),
clients: make(map[string]*ClientLogger),
clientLogger: make(map[string]logger.Logger),
logDirectory: "/var/log/docker/containers",
destroyLogsOnStop: logDestroy,
hostname: hostname,
}
}

Expand Down Expand Up @@ -102,7 +105,7 @@ func (pm *PipelineManager) CreateClientWithConfig(containerConfig ContainerOutpu
if err != nil {
return nil, errors.Wrap(err, "error creating config hash")
}
pipeline, err := pm.getOrCreatePipeline(containerConfig, file, hashstring)
pipeline, err := pm.getOrCreatePipeline(containerConfig, hashstring)
if err != nil {
return nil, errors.Wrap(err, "error getting pipeline")
}
Expand Down Expand Up @@ -135,7 +138,7 @@ func (pm *PipelineManager) CreateClientWithConfig(containerConfig ContainerOutpu
}

//actually get to crafting the new client.
cl, err := newClientFromPipeline(pipeline.pipeline, reader, hashstring, info, localLog)
cl, err := newClientFromPipeline(pipeline.pipeline, reader, hashstring, info, localLog, pm.hostname)
if err != nil {
return nil, errors.Wrap(err, "error creating client")
}
Expand Down Expand Up @@ -198,15 +201,15 @@ func (pm *PipelineManager) CreateReaderForContainer(info logger.Info, config log

// checkAndCreatePipeline performs the pipeline check and creation as one atomic operation
// It will either return a new pipeline, or an existing one from the pipeline map
func (pm *PipelineManager) getOrCreatePipeline(logOptsConfig ContainerOutputConfig, file string, hash uint64) (*Pipeline, error) {
func (pm *PipelineManager) getOrCreatePipeline(logOptsConfig ContainerOutputConfig, hash uint64) (*Pipeline, error) {
pm.mu.Lock()
defer pm.mu.Unlock()

var pipeline *Pipeline
var err error
pipeline, test := pm.pipelines[hash]
if !test {
pipeline, err = loadNewPipeline(logOptsConfig, file, pm.Logger)
pipeline, err = loadNewPipeline(logOptsConfig, pm.hostname, pm.Logger)
if err != nil {
return nil, errors.Wrap(err, "error loading pipeline")
}
Expand Down
4 changes: 2 additions & 2 deletions x-pack/dockerlogbeat/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ To build and install, just run `mage Package`. The build process happens entire

## Running

`docker run --log-driver=elastic-logging-plugin:8.0.0 --log-opt output.elasticsearch.hosts="172.18.0.2:9200" --log-opt output.elasticsearch.index="dockerbeat-test" -it debian:jessie /bin/bash`
`docker run --log-driver=elastic/elastic-logging-plugin:8.0.0 --log-opt hosts="172.18.0.2:9200" -it debian:jessie /bin/bash`


## Config Options
Expand Down Expand Up @@ -57,4 +57,4 @@ This plugin fully supports `docker logs`, and it maintains a local copy of logs
docker plugin set d805664c550e DESTROY_LOGS_ON_STOP=true
```

You can also set `max-file`, `max-size` and `compress` via `--log-opts`
You can also set `max-file`, `max-size` and `compress` via `--log-opts`

0 comments on commit 40cb94c

Please sign in to comment.