Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ECS fields in Elastic Log Driver, change index prefix #20522

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d

*Elastic Log Driver*
- Add support for `docker logs` command {pull}19531[19531]
- Add support to change beat name, and support for Kibana Logs. {pull}20522[20522]

==== Deprecated

Expand Down
5 changes: 5 additions & 0 deletions x-pack/dockerlogbeat/docs/configuration.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ indices. For example: +"dockerlogs-%{+yyyy.MM.dd}"+.

3+|*Advanced:*

|`name`
|`testbeat`
| A custom value that will be inserted into the document as `agent.name`.
If not set, it will be the hostname of Docker host.

|`backoff_init`
|`1s`
|The number of seconds to wait before trying to reconnect to {es} after
Expand Down
8 changes: 7 additions & 1 deletion x-pack/dockerlogbeat/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,13 @@ func main() {
if err != nil {
fatal("DESTROY_LOGS_ON_STOP must be 'true' or 'false': %s", err)
}
pipelines := pipelinemanager.NewPipelineManager(logDestroy)

hostname, err := os.Hostname()
if err != nil {
fatal("Error fetching hostname: %s", err)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of creating and passing the hostname around from here, would it make sense to create a beat.Info instead?


pipelines := pipelinemanager.NewPipelineManager(logDestroy, hostname)

sdkHandler := sdk.NewHandler(`{"Implements": ["LoggingDriver"]}`)
// Create handlers for startup and shutdown of the log driver
Expand Down
52 changes: 36 additions & 16 deletions x-pack/dockerlogbeat/pipelinemanager/clientLogReader.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,20 @@ type ClientLogger struct {
logger *logp.Logger
// ContainerMeta is the metadata object for the container we get from docker
ContainerMeta logger.Info
// ContainerECSMeta is a container metadata object appended to every event
ContainerECSMeta common.MapStr
// logFile is the FIFO reader that reads from the docker container stdio
logFile *pipereader.PipeReader
// client is the libbeat client object that sends logs upstream
client beat.Client
// localLog manages the local JSON logs for containers
localLog logger.Logger
// hostname for event metadata
hostname string
}

// newClientFromPipeline creates a new Client logger with a FIFO reader and beat client
func newClientFromPipeline(pipeline beat.PipelineConnector, inputFile *pipereader.PipeReader, hash uint64, info logger.Info, localLog logger.Logger) (*ClientLogger, error) {
func newClientFromPipeline(pipeline beat.PipelineConnector, inputFile *pipereader.PipeReader, hash uint64, info logger.Info, localLog logger.Logger, hostname string) (*ClientLogger, error) {
// setup the beat client
settings := beat.ClientConfig{
WaitClose: 0,
Expand All @@ -59,11 +63,13 @@ func newClientFromPipeline(pipeline beat.PipelineConnector, inputFile *pipereade
clientLogger.Debugf("Created new logger for %d", hash)

return &ClientLogger{logFile: inputFile,
client: client,
pipelineHash: hash,
ContainerMeta: info,
localLog: localLog,
logger: clientLogger}, nil
client: client,
pipelineHash: hash,
ContainerMeta: info,
ContainerECSMeta: constructECSContainerData(info),
localLog: localLog,
logger: clientLogger,
hostname: hostname}, nil
}

// Close closes the pipeline client and reader
Expand Down Expand Up @@ -100,6 +106,26 @@ func (cl *ClientLogger) ConsumePipelineAndSend() {
}
}

// constructECSContainerData creates an ES-ready MapString object with container metadata.
func constructECSContainerData(metadata logger.Info) common.MapStr {

var containerImageName, containerImageTag string
if idx := strings.IndexRune(metadata.ContainerImageName, ':'); idx >= 0 {
containerImageName = string([]rune(metadata.ContainerImageName)[:idx])
containerImageTag = string([]rune(metadata.ContainerImageName)[idx+1:])
}

return common.MapStr{
"labels": helper.DeDotLabels(metadata.ContainerLabels, true),
"id": metadata.ContainerID,
"name": helper.ExtractContainerName([]string{metadata.ContainerName}),
"image": common.MapStr{
"name": containerImageName,
"tag": containerImageTag,
},
}
}

// publishLoop sits in a loop and waits for events to publish
// Publish() can block if there is an upstream output issue. This is a problem because if the FIFO queues that handle the docker logs fill up, plugins can no longer send logs
// A buffered channel with its own publish gives us a little more wiggle room.
Expand All @@ -117,20 +143,14 @@ func (cl *ClientLogger) publishLoop(reader chan logdriver.LogEntry) {
cl.client.Publish(beat.Event{
Timestamp: time.Unix(0, entry.TimeNano),
Fields: common.MapStr{
"message": line,
"container": common.MapStr{
"labels": helper.DeDotLabels(cl.ContainerMeta.ContainerLabels, true),
"id": cl.ContainerMeta.ContainerID,
"name": helper.ExtractContainerName([]string{cl.ContainerMeta.ContainerName}),
"image": common.MapStr{
"name": cl.ContainerMeta.ContainerImageName,
},
"message": line,
"container": cl.ContainerECSMeta,
"host": common.MapStr{
"name": cl.hostname,
},
},
})

}

}

func constructLogSpoolMsg(line logdriver.LogEntry) *logger.Message {
Expand Down
12 changes: 2 additions & 10 deletions x-pack/dockerlogbeat/pipelinemanager/clientLogReader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,7 @@ func setupTestReader(t *testing.T, logString string, containerConfig logger.Info
}

// createNewClient sets up the "write side" of the pipeline, creating a log event to write and send back into the test.
func createNewClient(t *testing.T, logString string, mockConnector *pipelinemock.MockPipelineConnector, containerConfig logger.Info) *ClientLogger {
// an example container metadata struct
cfgObject := logger.Info{
Config: map[string]string{"output.elasticsearch": "localhost:9200"},
ContainerLabels: map[string]string{"test.label": "test"},
ContainerID: "3acc92989a97c415905eba090277b8a8834d087e58a95bed55450338ce0758dd",
ContainerName: "testContainer",
ContainerImageName: "TestImage",
}
func createNewClient(t *testing.T, logString string, mockConnector *pipelinemock.MockPipelineConnector, cfgObject logger.Info) *ClientLogger {

// create a new pipeline reader for use with the libbeat client
reader, err := pipereader.NewReaderFromReadCloser(pipelinemock.CreateTestInputFromLine(t, logString))
Expand All @@ -100,7 +92,7 @@ func createNewClient(t *testing.T, logString string, mockConnector *pipelinemock
localLog, err := jsonfilelog.New(info)
assert.NoError(t, err)

client, err := newClientFromPipeline(mockConnector, reader, 123, cfgObject, localLog)
client, err := newClientFromPipeline(mockConnector, reader, 123, cfgObject, localLog, "test")
require.NoError(t, err)

return client
Expand Down
2 changes: 2 additions & 0 deletions x-pack/dockerlogbeat/pipelinemanager/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type ContainerOutputConfig struct {
CloudID string `struct:"cloud.id,omitempty"`
CloudAuth string `struct:"cloud.auth,omitempty"`
ProxyURL string `struct:"output.elasticsearch.proxy_url,omitempty"`
BeatName string `struct:"-"`
fearful-symmetry marked this conversation as resolved.
Show resolved Hide resolved
}

// NewCfgFromRaw returns a ContainerOutputConfig based on a raw config we get from the API
Expand All @@ -53,6 +54,7 @@ func NewCfgFromRaw(input map[string]string) (ContainerOutputConfig, error) {
newCfg.Timeout = input["timeout"]
newCfg.BackoffInit = input["backoff_init"]
newCfg.BackoffMax = input["backoff_max"]
newCfg.BeatName = input["name"]

return newCfg, nil
}
Expand Down
33 changes: 9 additions & 24 deletions x-pack/dockerlogbeat/pipelinemanager/libbeattools.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/cloudid"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/file"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/outputs"
Expand Down Expand Up @@ -49,8 +48,7 @@ func makeConfigHash(cfg map[string]string) string {
}

// load pipeline starts up a new pipeline with the given config
func loadNewPipeline(logOptsConfig ContainerOutputConfig, name string, log *logp.Logger) (*Pipeline, error) {

func loadNewPipeline(logOptsConfig ContainerOutputConfig, hostname string, log *logp.Logger) (*Pipeline, error) {
cfg, err := logOptsConfig.CreateConfig()
if err != nil {
return nil, err
Expand All @@ -68,7 +66,7 @@ func loadNewPipeline(logOptsConfig ContainerOutputConfig, name string, log *logp
return nil, fmt.Errorf("unpacking config failed: %v", err)
}

info, err := getBeatInfo(cfg)
info, err := getBeatInfo(logOptsConfig, hostname)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -131,38 +129,25 @@ func parseCfgKeys(cfg map[string]string) (map[string]interface{}, error) {
}

// getBeatInfo returns the beat.Info type needed to start the pipeline
func getBeatInfo(cfg *common.Config) (beat.Info, error) {
func getBeatInfo(pluginOpts ContainerOutputConfig, hostname string) (beat.Info, error) {
vers := version.GetDefaultVersion()
hostname, err := os.Hostname()
if err != nil {
return beat.Info{}, errors.Wrap(err, "error getting hostname")
}

eid, err := uuid.NewV4()
if err != nil {
return beat.Info{}, errors.Wrap(err, "error creating ephemeral ID")
}

type nameStr struct {
Name string `config:"name"`
}
name := nameStr{}
err = cfg.Unpack(&name)
if err != nil {
return beat.Info{}, fmt.Errorf("unpacking config failed: %v", err)
}

if name.Name == "" {
name.Name = "elastic-log-driver"
}
id, err := loadMeta("/tmp/meta.json")
if err != nil {
return beat.Info{}, errors.Wrap(err, "error loading UUID")
}

beatName := "elastic-log-driver"

info := beat.Info{
Beat: name.Name,
Name: name.Name,
IndexPrefix: name.Name,
Beat: beatName,
Name: pluginOpts.BeatName,
IndexPrefix: "logs-docker",
Hostname: hostname,
Version: vers,
EphemeralID: eid,
Expand Down
13 changes: 8 additions & 5 deletions x-pack/dockerlogbeat/pipelinemanager/pipelineManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,20 @@ type PipelineManager struct {
logDirectory string
// destroyLogsOnStop indicates for the client to remove log files when a container stops
destroyLogsOnStop bool
// hostname of the docker host
hostname string
}

// NewPipelineManager creates a new Pipeline map
func NewPipelineManager(logDestroy bool) *PipelineManager {
func NewPipelineManager(logDestroy bool, hostname string) *PipelineManager {
return &PipelineManager{
Logger: logp.NewLogger("PipelineManager"),
pipelines: make(map[uint64]*Pipeline),
clients: make(map[string]*ClientLogger),
clientLogger: make(map[string]logger.Logger),
logDirectory: "/var/log/docker/containers",
destroyLogsOnStop: logDestroy,
hostname: hostname,
}
}

Expand Down Expand Up @@ -102,7 +105,7 @@ func (pm *PipelineManager) CreateClientWithConfig(containerConfig ContainerOutpu
if err != nil {
return nil, errors.Wrap(err, "error creating config hash")
}
pipeline, err := pm.getOrCreatePipeline(containerConfig, file, hashstring)
pipeline, err := pm.getOrCreatePipeline(containerConfig, hashstring)
if err != nil {
return nil, errors.Wrap(err, "error getting pipeline")
}
Expand Down Expand Up @@ -135,7 +138,7 @@ func (pm *PipelineManager) CreateClientWithConfig(containerConfig ContainerOutpu
}

//actually get to crafting the new client.
cl, err := newClientFromPipeline(pipeline.pipeline, reader, hashstring, info, localLog)
cl, err := newClientFromPipeline(pipeline.pipeline, reader, hashstring, info, localLog, pm.hostname)
if err != nil {
return nil, errors.Wrap(err, "error creating client")
}
Expand Down Expand Up @@ -198,15 +201,15 @@ func (pm *PipelineManager) CreateReaderForContainer(info logger.Info, config log

// checkAndCreatePipeline performs the pipeline check and creation as one atomic operation
// It will either return a new pipeline, or an existing one from the pipeline map
func (pm *PipelineManager) getOrCreatePipeline(logOptsConfig ContainerOutputConfig, file string, hash uint64) (*Pipeline, error) {
func (pm *PipelineManager) getOrCreatePipeline(logOptsConfig ContainerOutputConfig, hash uint64) (*Pipeline, error) {
pm.mu.Lock()
defer pm.mu.Unlock()

var pipeline *Pipeline
var err error
pipeline, test := pm.pipelines[hash]
if !test {
pipeline, err = loadNewPipeline(logOptsConfig, file, pm.Logger)
pipeline, err = loadNewPipeline(logOptsConfig, pm.hostname, pm.Logger)
if err != nil {
return nil, errors.Wrap(err, "error loading pipeline")
}
Expand Down
4 changes: 2 additions & 2 deletions x-pack/dockerlogbeat/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ To build and install, just run `mage Package`. The build process happens entire

## Running

`docker run --log-driver=elastic-logging-plugin:8.0.0 --log-opt output.elasticsearch.hosts="172.18.0.2:9200" --log-opt output.elasticsearch.index="dockerbeat-test" -it debian:jessie /bin/bash`
`docker run --log-driver=elastic/elastic-logging-plugin:8.0.0 --log-opt hosts="172.18.0.2:9200" -it debian:jessie /bin/bash`


## Config Options
Expand Down Expand Up @@ -57,4 +57,4 @@ This plugin fully supports `docker logs`, and it maintains a local copy of logs
docker plugin set d805664c550e DESTROY_LOGS_ON_STOP=true
```

You can also set `max-file`, `max-size` and `compress` via `--log-opts`
You can also set `max-file`, `max-size` and `compress` via `--log-opts`