Skip to content

Commit

Permalink
Add docker logs support to the Elastic Log Driver (elastic#19531)
Browse files Browse the repository at this point in the history
* init commit of docker logs support

* remove vendor

* fix tests

* mage fmt

* code cleanup

* change logging directories around, add code to remove containers

* remove error message on EOF

* fix config, add docs

* more fixes, migrate to writing logs to docker's own location

* add changelog entry

* docs cleanup
  • Loading branch information
fearful-symmetry authored and melchiormoulin committed Oct 14, 2020
1 parent beaa08c commit 05f82e7
Show file tree
Hide file tree
Showing 11 changed files with 371 additions and 36 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,9 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add registry and code signature information and ECS categorization fields for sysmon module {pull}18058[18058]
- Add new winlogbeat security dashboard {pull}18775[18775]

*Elastic Log Driver*
- Add support for `docker logs` command {pull}19531[19531]

==== Deprecated

*Affecting all Beats*
Expand Down
24 changes: 24 additions & 0 deletions x-pack/dockerlogbeat/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,31 @@
],
"socket": "beatSocket.sock"
},
"mounts": [
{
"name": "LOG_DIR",
"description": "Mount for local log cache",
"destination": "/var/log/docker",
"source": "/var/lib/docker",
"type": "none",
"options": [
"rw",
"rbind"
],
"Settable": [
"source"
]
}
],
"env": [
{
"description": "Destroy logs after a container has stopped",
"name": "DESTROY_LOGS_ON_STOP",
"value": "false",
"Settable": [
"value"
]
},
{
"description": "debug level",
"name": "LOG_DRIVER_LEVEL",
Expand Down
73 changes: 69 additions & 4 deletions x-pack/dockerlogbeat/docs/configuration.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,6 @@ format is `"username:password"`.
[[es-output-options]]
=== {es} output options

// TODO: Add the following settings. Syntax is a little different so we might
// need to add deameon examples that show how to specify these settings:
// `output.elasticsearch.indices
// `output.elasticsearch.pipelines`

[options="header"]
|=====
Expand Down Expand Up @@ -117,3 +113,72 @@ for more information about the environment variables.


|=====


[float]
[[local-log-opts]]
=== Configuring the local log
This plugin fully supports `docker logs`, and it maintains a local copy of logs that can be read without a connection to Elasticsearch. The plugin mounts the `/var/lib/docker` directory on the host to write logs to `/var/log/containers` on the host. If you want to change the log location on the host, you must change the mount inside the plugin:

1. Disable the plugin:
+
["source","sh",subs="attributes"]
----
docker plugin disable elastic/{log-driver-alias}:{version}
----

2. Set the bindmount directory:
+
["source","sh",subs="attributes"]
----
docker plugin set elastic/{log-driver-alias}:{version} LOG_DIR.source=NEW_LOG_LOCATION
----
+

3. Enable the plugin:
+
["source","sh",subs="attributes"]
----
docker plugin enable elastic/{log-driver-alias}:{version}
----


The local log also supports the `max-file`, `max-size` and `compress` options that are https://docs.docker.com/config/containers/logging/json-file/#options[a part of the Docker default file logger]. For example:

["source","sh",subs="attributes"]
----
docker run --log-driver=elastic/{log-driver-alias}:{version} \
--log-opt endpoint="myhost:9200" \
--log-opt user="myusername" \
--log-opt password="mypassword" \
--log-opt max-file=10 \
--log-opt max-size=5M \
--log-opt compress=true \
-it debian:jessie /bin/bash
----


In situations where logs can't be easily managed, for example, you can also configure the plugin to remove log files when a container is stopped. This will prevent you from reading logs on a stopped container, but it will rotate logs without user intervention. To enable removal of logs for stopped containers, you must change the `DESTROY_LOGS_ON_STOP` environment variable:

1. Disable the plugin:
+
["source","sh",subs="attributes"]
----
docker plugin disable elastic/{log-driver-alias}:{version}
----

2. Enable log removal:
+
["source","sh",subs="attributes"]
----
docker plugin set elastic/{log-driver-alias}:{version} DESTROY_LOGS_ON_STOP=true
----
+

3. Enable the plugin:
+
["source","sh",subs="attributes"]
----
docker plugin enable elastic/{log-driver-alias}:{version}
----

50 changes: 48 additions & 2 deletions x-pack/dockerlogbeat/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ package main

import (
"encoding/json"
"io"
"net/http"

"github.com/docker/docker/daemon/logger"

"github.com/elastic/beats/v7/x-pack/dockerlogbeat/pipelinemanager"

"github.com/docker/docker/pkg/ioutils"
"github.com/pkg/errors"
)

Expand All @@ -26,6 +28,26 @@ type StopLoggingRequest struct {
File string
}

// capabilitiesResponse represents the response to a capabilities request
type capabilitiesResponse struct {
Err string
Cap logger.Capability
}

// logsRequest represents the request object we get from a `docker logs` call
type logsRequest struct {
Info logger.Info
Config logger.ReadConfig
}

func reportCaps() func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
json.NewEncoder(w).Encode(&capabilitiesResponse{
Cap: logger.Capability{ReadLogs: true},
})
}
}

// This gets called when a container starts that requests the log driver
func startLoggingHandler(pm *pipelinemanager.PipelineManager) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -36,7 +58,7 @@ func startLoggingHandler(pm *pipelinemanager.PipelineManager) func(w http.Respon
return
}

pm.Logger.Infof("Got start request object from container %#v\n", startReq.Info.ContainerName)
pm.Logger.Debugf("Got start request object from container %#v\n", startReq.Info.ContainerName)
pm.Logger.Debugf("Got a container with the following labels: %#v\n", startReq.Info.ContainerLabels)
pm.Logger.Debugf("Got a container with the following log opts: %#v\n", startReq.Info.Config)

Expand Down Expand Up @@ -67,7 +89,7 @@ func stopLoggingHandler(pm *pipelinemanager.PipelineManager) func(w http.Respons
http.Error(w, errors.Wrap(err, "error decoding json request").Error(), http.StatusBadRequest)
return
}
pm.Logger.Infof("Got stop request object %#v\n", stopReq)
pm.Logger.Debugf("Got stop request object %#v\n", stopReq)
// Run the stop async, since nothing 'depends' on it,
// and we can break people's docker automation if this times out.
go func() {
Expand All @@ -81,6 +103,30 @@ func stopLoggingHandler(pm *pipelinemanager.PipelineManager) func(w http.Respons
} // end func
}

func readLogHandler(pm *pipelinemanager.PipelineManager) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
var logReq logsRequest
err := json.NewDecoder(r.Body).Decode(&logReq)
if err != nil {
http.Error(w, errors.Wrap(err, "error decoding json request").Error(), http.StatusBadRequest)
return
}

pm.Logger.Debugf("Got logging request for container %s\n", logReq.Info.ContainerName)
stream, err := pm.CreateReaderForContainer(logReq.Info, logReq.Config)
if err != nil {
http.Error(w, errors.Wrap(err, "error creating log reader").Error(), http.StatusBadRequest)
return
}
defer stream.Close()
w.Header().Set("Content-Type", "application/x-json-stream")
wf := ioutils.NewWriteFlusher(w)
defer wf.Close()
io.Copy(wf, stream)

} //end func
}

// For the start/stop handler, the daemon expects back an error object. If the body is empty, then all is well.
func respondOK(w http.ResponseWriter) {
res := struct {
Expand Down
17 changes: 16 additions & 1 deletion x-pack/dockerlogbeat/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package main
import (
"fmt"
"os"
"strconv"

"github.com/docker/go-plugins-helpers/sdk"

Expand Down Expand Up @@ -41,6 +42,14 @@ func genNewMonitoringConfig() (*common.Config, error) {
return cfg, nil
}

func setDestroyLogsOnStop() (bool, error) {
setting, ok := os.LookupEnv("DESTROY_LOGS_ON_STOP")
if !ok {
return false, nil
}
return strconv.ParseBool(setting)
}

func fatal(format string, vs ...interface{}) {
fmt.Fprintf(os.Stderr, format, vs...)
os.Exit(1)
Expand All @@ -60,12 +69,18 @@ func main() {
fatal("error starting log handler: %s", err)
}

pipelines := pipelinemanager.NewPipelineManager(logcfg)
logDestroy, err := setDestroyLogsOnStop()
if err != nil {
fatal("DESTROY_LOGS_ON_STOP must be 'true' or 'false': %s", err)
}
pipelines := pipelinemanager.NewPipelineManager(logDestroy)

sdkHandler := sdk.NewHandler(`{"Implements": ["LoggingDriver"]}`)
// Create handlers for startup and shutdown of the log driver
sdkHandler.HandleFunc("/LogDriver.StartLogging", startLoggingHandler(pipelines))
sdkHandler.HandleFunc("/LogDriver.StopLogging", stopLoggingHandler(pipelines))
sdkHandler.HandleFunc("/LogDriver.Capabilities", reportCaps())
sdkHandler.HandleFunc("/LogDriver.ReadLogs", readLogHandler(pipelines))

err = sdkHandler.ServeUnix("beatSocket", 0)
if err != nil {
Expand Down
67 changes: 49 additions & 18 deletions x-pack/dockerlogbeat/pipelinemanager/clientLogReader.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
package pipelinemanager

import (
"os"
"io"
"strings"
"time"

"github.com/docker/docker/api/types/plugins/logdriver"
"github.com/docker/docker/daemon/logger"

"github.com/docker/docker/api/types/backend"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/acker"
Expand All @@ -20,20 +22,26 @@ import (
"github.com/elastic/beats/v7/x-pack/dockerlogbeat/pipereader"
)

// ClientLogger is an instance of a pipeline logger client meant for reading from a single log stream
// There's a many-to-one relationship between clients and pipelines.
// Each container with the same config will get its own client to the same pipeline.
// ClientLogger collects logs for a docker container logging to stdout and stderr, using the FIFO provided by the docker daemon.
// Each log line is written to a local log file for retrieval via "docker logs", and forwarded to the beats publisher pipeline.
// The local log storage is based on the docker json-file logger and supports the same settings. If "max-size" is not configured, we will rotate the log file every 10MB.
type ClientLogger struct {
logFile *pipereader.PipeReader
client beat.Client
pipelineHash uint64
closer chan struct{}
containerMeta logger.Info
logger *logp.Logger
// pipelineHash is a hash of the libbeat publisher pipeline config
pipelineHash uint64
// logger is the internal error message logger
logger *logp.Logger
// ContainerMeta is the metadata object for the container we get from docker
ContainerMeta logger.Info
// logFile is the FIFO reader that reads from the docker container stdio
logFile *pipereader.PipeReader
// client is the libbeat client object that sends logs upstream
client beat.Client
// localLog manages the local JSON logs for containers
localLog logger.Logger
}

// newClientFromPipeline creates a new Client logger with a FIFO reader and beat client
func newClientFromPipeline(pipeline beat.PipelineConnector, inputFile *pipereader.PipeReader, hash uint64, info logger.Info) (*ClientLogger, error) {
func newClientFromPipeline(pipeline beat.PipelineConnector, inputFile *pipereader.PipeReader, hash uint64, info logger.Info, localLog logger.Logger) (*ClientLogger, error) {
// setup the beat client
settings := beat.ClientConfig{
WaitClose: 0,
Expand All @@ -50,7 +58,12 @@ func newClientFromPipeline(pipeline beat.PipelineConnector, inputFile *pipereade

clientLogger.Debugf("Created new logger for %d", hash)

return &ClientLogger{logFile: inputFile, client: client, pipelineHash: hash, closer: make(chan struct{}), containerMeta: info, logger: clientLogger}, nil
return &ClientLogger{logFile: inputFile,
client: client,
pipelineHash: hash,
ContainerMeta: info,
localLog: localLog,
logger: clientLogger}, nil
}

// Close closes the pipeline client and reader
Expand All @@ -64,7 +77,6 @@ func (cl *ClientLogger) Close() error {
// ConsumePipelineAndSend consumes events from the FIFO pipe and sends them to the pipeline client
func (cl *ClientLogger) ConsumePipelineAndSend() {
publishWriter := make(chan logdriver.LogEntry, 500)

go cl.publishLoop(publishWriter)
// Clean up the reader after we're done
defer func() {
Expand All @@ -76,7 +88,10 @@ func (cl *ClientLogger) ConsumePipelineAndSend() {
for {
err := cl.logFile.ReadMessage(&log)
if err != nil {
cl.logger.Error(os.Stderr, "Error getting message: %s\n", err)
if err == io.EOF {
return
}
cl.logger.Errorf("Error getting message: %s\n", err)
return
}
publishWriter <- log
Expand All @@ -96,18 +111,19 @@ func (cl *ClientLogger) publishLoop(reader chan logdriver.LogEntry) {
return
}

cl.localLog.Log(constructLogSpoolMsg(entry))
line := strings.TrimSpace(string(entry.Line))

cl.client.Publish(beat.Event{
Timestamp: time.Unix(0, entry.TimeNano),
Fields: common.MapStr{
"message": line,
"container": common.MapStr{
"labels": helper.DeDotLabels(cl.containerMeta.ContainerLabels, true),
"id": cl.containerMeta.ContainerID,
"name": helper.ExtractContainerName([]string{cl.containerMeta.ContainerName}),
"labels": helper.DeDotLabels(cl.ContainerMeta.ContainerLabels, true),
"id": cl.ContainerMeta.ContainerID,
"name": helper.ExtractContainerName([]string{cl.ContainerMeta.ContainerName}),
"image": common.MapStr{
"name": cl.containerMeta.ContainerImageName,
"name": cl.ContainerMeta.ContainerImageName,
},
},
},
Expand All @@ -116,3 +132,18 @@ func (cl *ClientLogger) publishLoop(reader chan logdriver.LogEntry) {
}

}

func constructLogSpoolMsg(line logdriver.LogEntry) *logger.Message {
var msg logger.Message

msg.Line = line.Line
msg.Source = line.Source
msg.Timestamp = time.Unix(0, line.TimeNano)
if line.PartialLogMetadata != nil {
msg.PLogMetaData = &backend.PartialLogMetaData{}
msg.PLogMetaData.ID = line.PartialLogMetadata.Id
msg.PLogMetaData.Last = line.PartialLogMetadata.Last
msg.PLogMetaData.Ordinal = int(line.PartialLogMetadata.Ordinal)
}
return &msg
}
Loading

0 comments on commit 05f82e7

Please sign in to comment.