Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick #19531 to 7.x: Add docker logs support to the Elastic Log Driver #19808

Merged
merged 1 commit into from
Jul 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,9 @@ field. You can revert this change by configuring tags for the module and omittin
- Add support for event IDs 4673,4674,4697,4698,4699,4700,4701,4702,4768,4769,4770,4771,4776,4778,4779,4964 to the Security module {pull}17517[17517]
- Add registry and code signature information and ECS categorization fields for sysmon module {pull}18058[18058]

*Elastic Log Driver*
- Add support for `docker logs` command {pull}19531[19531]

==== Deprecated

*Affecting all Beats*
Expand Down
24 changes: 24 additions & 0 deletions x-pack/dockerlogbeat/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,31 @@
],
"socket": "beatSocket.sock"
},
"mounts": [
{
"name": "LOG_DIR",
"description": "Mount for local log cache",
"destination": "/var/log/docker",
"source": "/var/lib/docker",
"type": "none",
"options": [
"rw",
"rbind"
],
"Settable": [
"source"
]
}
],
"env": [
{
"description": "Destroy logs after a container has stopped",
"name": "DESTROY_LOGS_ON_STOP",
"value": "false",
"Settable": [
"value"
]
},
{
"description": "debug level",
"name": "LOG_DRIVER_LEVEL",
Expand Down
73 changes: 69 additions & 4 deletions x-pack/dockerlogbeat/docs/configuration.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,6 @@ format is `"username:password"`.
[[es-output-options]]
=== {es} output options

// TODO: Add the following settings. Syntax is a little different so we might
// need to add deameon examples that show how to specify these settings:
// `output.elasticsearch.indices
// `output.elasticsearch.pipelines`

[options="header"]
|=====
Expand Down Expand Up @@ -117,3 +113,72 @@ for more information about the environment variables.


|=====


[float]
[[local-log-opts]]
=== Configuring the local log
This plugin fully supports `docker logs`, and it maintains a local copy of logs that can be read without a connection to Elasticsearch. The plugin mounts the `/var/lib/docker` directory on the host to write logs to `/var/log/containers` on the host. If you want to change the log location on the host, you must change the mount inside the plugin:

1. Disable the plugin:
+
["source","sh",subs="attributes"]
----
docker plugin disable elastic/{log-driver-alias}:{version}
----

2. Set the bindmount directory:
+
["source","sh",subs="attributes"]
----
docker plugin set elastic/{log-driver-alias}:{version} LOG_DIR.source=NEW_LOG_LOCATION
----
+

3. Enable the plugin:
+
["source","sh",subs="attributes"]
----
docker plugin enable elastic/{log-driver-alias}:{version}
----


The local log also supports the `max-file`, `max-size` and `compress` options that are https://docs.docker.com/config/containers/logging/json-file/#options[a part of the Docker default file logger]. For example:

["source","sh",subs="attributes"]
----
docker run --log-driver=elastic/{log-driver-alias}:{version} \
--log-opt endpoint="myhost:9200" \
--log-opt user="myusername" \
--log-opt password="mypassword" \
--log-opt max-file=10 \
--log-opt max-size=5M \
--log-opt compress=true \
-it debian:jessie /bin/bash
----


In situations where logs can't be easily managed, for example, you can also configure the plugin to remove log files when a container is stopped. This will prevent you from reading logs on a stopped container, but it will rotate logs without user intervention. To enable removal of logs for stopped containers, you must change the `DESTROY_LOGS_ON_STOP` environment variable:

1. Disable the plugin:
+
["source","sh",subs="attributes"]
----
docker plugin disable elastic/{log-driver-alias}:{version}
----

2. Enable log removal:
+
["source","sh",subs="attributes"]
----
docker plugin set elastic/{log-driver-alias}:{version} DESTROY_LOGS_ON_STOP=true
----
+

3. Enable the plugin:
+
["source","sh",subs="attributes"]
----
docker plugin enable elastic/{log-driver-alias}:{version}
----

50 changes: 48 additions & 2 deletions x-pack/dockerlogbeat/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ package main

import (
"encoding/json"
"io"
"net/http"

"github.com/docker/docker/daemon/logger"

"github.com/elastic/beats/v7/x-pack/dockerlogbeat/pipelinemanager"

"github.com/docker/docker/pkg/ioutils"
"github.com/pkg/errors"
)

Expand All @@ -26,6 +28,26 @@ type StopLoggingRequest struct {
File string
}

// capabilitiesResponse represents the response to a capabilities request
type capabilitiesResponse struct {
Err string
Cap logger.Capability
}

// logsRequest represents the request object we get from a `docker logs` call
type logsRequest struct {
Info logger.Info
Config logger.ReadConfig
}

func reportCaps() func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
json.NewEncoder(w).Encode(&capabilitiesResponse{
Cap: logger.Capability{ReadLogs: true},
})
}
}

// This gets called when a container starts that requests the log driver
func startLoggingHandler(pm *pipelinemanager.PipelineManager) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -36,7 +58,7 @@ func startLoggingHandler(pm *pipelinemanager.PipelineManager) func(w http.Respon
return
}

pm.Logger.Infof("Got start request object from container %#v\n", startReq.Info.ContainerName)
pm.Logger.Debugf("Got start request object from container %#v\n", startReq.Info.ContainerName)
pm.Logger.Debugf("Got a container with the following labels: %#v\n", startReq.Info.ContainerLabels)
pm.Logger.Debugf("Got a container with the following log opts: %#v\n", startReq.Info.Config)

Expand Down Expand Up @@ -67,7 +89,7 @@ func stopLoggingHandler(pm *pipelinemanager.PipelineManager) func(w http.Respons
http.Error(w, errors.Wrap(err, "error decoding json request").Error(), http.StatusBadRequest)
return
}
pm.Logger.Infof("Got stop request object %#v\n", stopReq)
pm.Logger.Debugf("Got stop request object %#v\n", stopReq)
// Run the stop async, since nothing 'depends' on it,
// and we can break people's docker automation if this times out.
go func() {
Expand All @@ -81,6 +103,30 @@ func stopLoggingHandler(pm *pipelinemanager.PipelineManager) func(w http.Respons
} // end func
}

func readLogHandler(pm *pipelinemanager.PipelineManager) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
var logReq logsRequest
err := json.NewDecoder(r.Body).Decode(&logReq)
if err != nil {
http.Error(w, errors.Wrap(err, "error decoding json request").Error(), http.StatusBadRequest)
return
}

pm.Logger.Debugf("Got logging request for container %s\n", logReq.Info.ContainerName)
stream, err := pm.CreateReaderForContainer(logReq.Info, logReq.Config)
if err != nil {
http.Error(w, errors.Wrap(err, "error creating log reader").Error(), http.StatusBadRequest)
return
}
defer stream.Close()
w.Header().Set("Content-Type", "application/x-json-stream")
wf := ioutils.NewWriteFlusher(w)
defer wf.Close()
io.Copy(wf, stream)

} //end func
}

// For the start/stop handler, the daemon expects back an error object. If the body is empty, then all is well.
func respondOK(w http.ResponseWriter) {
res := struct {
Expand Down
17 changes: 16 additions & 1 deletion x-pack/dockerlogbeat/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package main
import (
"fmt"
"os"
"strconv"

"github.com/docker/go-plugins-helpers/sdk"

Expand Down Expand Up @@ -41,6 +42,14 @@ func genNewMonitoringConfig() (*common.Config, error) {
return cfg, nil
}

func setDestroyLogsOnStop() (bool, error) {
setting, ok := os.LookupEnv("DESTROY_LOGS_ON_STOP")
if !ok {
return false, nil
}
return strconv.ParseBool(setting)
}

func fatal(format string, vs ...interface{}) {
fmt.Fprintf(os.Stderr, format, vs...)
os.Exit(1)
Expand All @@ -60,12 +69,18 @@ func main() {
fatal("error starting log handler: %s", err)
}

pipelines := pipelinemanager.NewPipelineManager(logcfg)
logDestroy, err := setDestroyLogsOnStop()
if err != nil {
fatal("DESTROY_LOGS_ON_STOP must be 'true' or 'false': %s", err)
}
pipelines := pipelinemanager.NewPipelineManager(logDestroy)

sdkHandler := sdk.NewHandler(`{"Implements": ["LoggingDriver"]}`)
// Create handlers for startup and shutdown of the log driver
sdkHandler.HandleFunc("/LogDriver.StartLogging", startLoggingHandler(pipelines))
sdkHandler.HandleFunc("/LogDriver.StopLogging", stopLoggingHandler(pipelines))
sdkHandler.HandleFunc("/LogDriver.Capabilities", reportCaps())
sdkHandler.HandleFunc("/LogDriver.ReadLogs", readLogHandler(pipelines))

err = sdkHandler.ServeUnix("beatSocket", 0)
if err != nil {
Expand Down
67 changes: 49 additions & 18 deletions x-pack/dockerlogbeat/pipelinemanager/clientLogReader.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
package pipelinemanager

import (
"os"
"io"
"strings"
"time"

"github.com/docker/docker/api/types/plugins/logdriver"
"github.com/docker/docker/daemon/logger"

"github.com/docker/docker/api/types/backend"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/acker"
Expand All @@ -20,20 +22,26 @@ import (
"github.com/elastic/beats/v7/x-pack/dockerlogbeat/pipereader"
)

// ClientLogger is an instance of a pipeline logger client meant for reading from a single log stream
// There's a many-to-one relationship between clients and pipelines.
// Each container with the same config will get its own client to the same pipeline.
// ClientLogger collects logs for a docker container logging to stdout and stderr, using the FIFO provided by the docker daemon.
// Each log line is written to a local log file for retrieval via "docker logs", and forwarded to the beats publisher pipeline.
// The local log storage is based on the docker json-file logger and supports the same settings. If "max-size" is not configured, we will rotate the log file every 10MB.
type ClientLogger struct {
logFile *pipereader.PipeReader
client beat.Client
pipelineHash uint64
closer chan struct{}
containerMeta logger.Info
logger *logp.Logger
// pipelineHash is a hash of the libbeat publisher pipeline config
pipelineHash uint64
// logger is the internal error message logger
logger *logp.Logger
// ContainerMeta is the metadata object for the container we get from docker
ContainerMeta logger.Info
// logFile is the FIFO reader that reads from the docker container stdio
logFile *pipereader.PipeReader
// client is the libbeat client object that sends logs upstream
client beat.Client
// localLog manages the local JSON logs for containers
localLog logger.Logger
}

// newClientFromPipeline creates a new Client logger with a FIFO reader and beat client
func newClientFromPipeline(pipeline beat.PipelineConnector, inputFile *pipereader.PipeReader, hash uint64, info logger.Info) (*ClientLogger, error) {
func newClientFromPipeline(pipeline beat.PipelineConnector, inputFile *pipereader.PipeReader, hash uint64, info logger.Info, localLog logger.Logger) (*ClientLogger, error) {
// setup the beat client
settings := beat.ClientConfig{
WaitClose: 0,
Expand All @@ -50,7 +58,12 @@ func newClientFromPipeline(pipeline beat.PipelineConnector, inputFile *pipereade

clientLogger.Debugf("Created new logger for %d", hash)

return &ClientLogger{logFile: inputFile, client: client, pipelineHash: hash, closer: make(chan struct{}), containerMeta: info, logger: clientLogger}, nil
return &ClientLogger{logFile: inputFile,
client: client,
pipelineHash: hash,
ContainerMeta: info,
localLog: localLog,
logger: clientLogger}, nil
}

// Close closes the pipeline client and reader
Expand All @@ -64,7 +77,6 @@ func (cl *ClientLogger) Close() error {
// ConsumePipelineAndSend consumes events from the FIFO pipe and sends them to the pipeline client
func (cl *ClientLogger) ConsumePipelineAndSend() {
publishWriter := make(chan logdriver.LogEntry, 500)

go cl.publishLoop(publishWriter)
// Clean up the reader after we're done
defer func() {
Expand All @@ -76,7 +88,10 @@ func (cl *ClientLogger) ConsumePipelineAndSend() {
for {
err := cl.logFile.ReadMessage(&log)
if err != nil {
cl.logger.Error(os.Stderr, "Error getting message: %s\n", err)
if err == io.EOF {
return
}
cl.logger.Errorf("Error getting message: %s\n", err)
return
}
publishWriter <- log
Expand All @@ -96,18 +111,19 @@ func (cl *ClientLogger) publishLoop(reader chan logdriver.LogEntry) {
return
}

cl.localLog.Log(constructLogSpoolMsg(entry))
line := strings.TrimSpace(string(entry.Line))

cl.client.Publish(beat.Event{
Timestamp: time.Unix(0, entry.TimeNano),
Fields: common.MapStr{
"message": line,
"container": common.MapStr{
"labels": helper.DeDotLabels(cl.containerMeta.ContainerLabels, true),
"id": cl.containerMeta.ContainerID,
"name": helper.ExtractContainerName([]string{cl.containerMeta.ContainerName}),
"labels": helper.DeDotLabels(cl.ContainerMeta.ContainerLabels, true),
"id": cl.ContainerMeta.ContainerID,
"name": helper.ExtractContainerName([]string{cl.ContainerMeta.ContainerName}),
"image": common.MapStr{
"name": cl.containerMeta.ContainerImageName,
"name": cl.ContainerMeta.ContainerImageName,
},
},
},
Expand All @@ -116,3 +132,18 @@ func (cl *ClientLogger) publishLoop(reader chan logdriver.LogEntry) {
}

}

func constructLogSpoolMsg(line logdriver.LogEntry) *logger.Message {
var msg logger.Message

msg.Line = line.Line
msg.Source = line.Source
msg.Timestamp = time.Unix(0, line.TimeNano)
if line.PartialLogMetadata != nil {
msg.PLogMetaData = &backend.PartialLogMetaData{}
msg.PLogMetaData.ID = line.PartialLogMetadata.Id
msg.PLogMetaData.Last = line.PartialLogMetadata.Last
msg.PLogMetaData.Ordinal = int(line.PartialLogMetadata.Ordinal)
}
return &msg
}
Loading