Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add docker logs support to the Elastic Log Driver #19531

Merged
merged 14 commits into from
Jul 9, 2020
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions x-pack/dockerlogbeat/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,31 @@
],
"socket": "beatSocket.sock"
},
"mounts": [
{
"name": "LOG_DIR",
"description": "Mount for local log cache",
"destination": "/var/log/docker",
"source": "/var/log",
"type": "none",
"options": [
"rw",
"rbind"
],
"Settable": [
"source"
]
}
],
"env": [
{
"description": "Destroy logs after a container has stopped",
"name": "DESTROY_LOGS_ON_STOP",
"value": "false",
"Settable": [
"value"
]
},
{
"description": "debug level",
"name": "LOG_DRIVER_LEVEL",
Expand Down
56 changes: 52 additions & 4 deletions x-pack/dockerlogbeat/docs/configuration.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,6 @@ format is `"username:password"`.
[[es-output-options]]
=== {es} output options

// TODO: Add the following settings. Syntax is a little different so we might
// need to add deameon examples that show how to specify these settings:
// `output.elasticsearch.indices
// `output.elasticsearch.pipelines`

[options="header"]
|=====
Expand Down Expand Up @@ -117,3 +113,55 @@ for more information about the environment variables.


|=====

fearful-symmetry marked this conversation as resolved.
Show resolved Hide resolved

[float]
[[local-log-opts]]
=== Configuring the local log
This plugin fully supports `docker logs`, and it maintains a local log spool in the event that upstream ES connections are down. Unfortunately, due to the limitations in the docker plugin API, we can't "clean up" log files when a container is destroyed. The plugin mounts the `/var/log` directory on the host to write logs to `/var/log/containers` on host. To change this directory, you must change the mount inside the plugin:

1. Disable the plugin:
+
["source","sh",subs="attributes"]
----
docker plugin disable elastic/{log-driver-alias}:{version}
----

2. Set the debug level:
fearful-symmetry marked this conversation as resolved.
Show resolved Hide resolved
+
["source","sh",subs="attributes"]
----
docker plugin set elastic/{log-driver-alias}:{version} LOG_DIR.source=NEW_LOG_LOCATION
----
+

3. Enable the plugin:
+
["source","sh",subs="attributes"]
----
docker plugin enable elastic/{log-driver-alias}:{version}
----

In situations where logs can't be easily managed, for example, Docker for Mac, you can also configure the plugin to remove log files when a container is stopped. This will prevent you from reading logs on a stopped container, but it will rotate logs without user intervention. To enable removal of logs for stopped containers, you must change the `DESTROY_LOGS_ON_STOP` environment variable:
fearful-symmetry marked this conversation as resolved.
Show resolved Hide resolved

1. Disable the plugin:
+
["source","sh",subs="attributes"]
----
docker plugin disable elastic/{log-driver-alias}:{version}
----

2. Set the debug level:
fearful-symmetry marked this conversation as resolved.
Show resolved Hide resolved
+
["source","sh",subs="attributes"]
----
docker plugin set elastic/{log-driver-alias}:{version} DESTROY_LOGS_ON_STOP=true
----
+

3. Enable the plugin:
+
["source","sh",subs="attributes"]
----
docker plugin enable elastic/{log-driver-alias}:{version}
----
49 changes: 47 additions & 2 deletions x-pack/dockerlogbeat/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ package main

import (
"encoding/json"
"io"
"net/http"

"github.com/docker/docker/daemon/logger"

"github.com/elastic/beats/v7/x-pack/dockerlogbeat/pipelinemanager"

"github.com/docker/docker/pkg/ioutils"
"github.com/pkg/errors"
)

Expand All @@ -26,6 +28,26 @@ type StopLoggingRequest struct {
File string
}

// capabilitiesResponse represents the response to a capabilities request
type capabilitiesResponse struct {
Err string
Cap logger.Capability
}

// logsRequest represents the request object we get from a `docker logs` call
type logsRequest struct {
Info logger.Info
Config logger.ReadConfig
}

func reportCaps() func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
json.NewEncoder(w).Encode(&capabilitiesResponse{
Cap: logger.Capability{ReadLogs: true},
})
}
}

// This gets called when a container starts that requests the log driver
func startLoggingHandler(pm *pipelinemanager.PipelineManager) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -36,7 +58,7 @@ func startLoggingHandler(pm *pipelinemanager.PipelineManager) func(w http.Respon
return
}

pm.Logger.Infof("Got start request object from container %#v\n", startReq.Info.ContainerName)
pm.Logger.Debugf("Got start request object from container %#v\n", startReq.Info.ContainerName)
pm.Logger.Debugf("Got a container with the following labels: %#v\n", startReq.Info.ContainerLabels)
pm.Logger.Debugf("Got a container with the following log opts: %#v\n", startReq.Info.Config)

Expand Down Expand Up @@ -67,7 +89,7 @@ func stopLoggingHandler(pm *pipelinemanager.PipelineManager) func(w http.Respons
http.Error(w, errors.Wrap(err, "error decoding json request").Error(), http.StatusBadRequest)
return
}
pm.Logger.Infof("Got stop request object %#v\n", stopReq)
pm.Logger.Debugf("Got stop request object %#v\n", stopReq)
// Run the stop async, since nothing 'depends' on it,
// and we can break people's docker automation if this times out.
go func() {
Expand All @@ -81,6 +103,29 @@ func stopLoggingHandler(pm *pipelinemanager.PipelineManager) func(w http.Respons
} // end func
}

func readLogHandler(pm *pipelinemanager.PipelineManager) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
var logReq logsRequest
err := json.NewDecoder(r.Body).Decode(&logReq)
if err != nil {
http.Error(w, errors.Wrap(err, "error decoding json request").Error(), http.StatusBadRequest)
return
}

pm.Logger.Debugf("Got logging request for container %s\n", logReq.Info.ContainerName)
stream, err := pm.CreateReaderForContainer(logReq.Info, logReq.Config)
fearful-symmetry marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
http.Error(w, errors.Wrap(err, "error creating log reader").Error(), http.StatusBadRequest)
return
}
w.Header().Set("Content-Type", "application/x-json-stream")
wf := ioutils.NewWriteFlusher(w)
io.Copy(wf, stream)
fearful-symmetry marked this conversation as resolved.
Show resolved Hide resolved
wf.Close()
fearful-symmetry marked this conversation as resolved.
Show resolved Hide resolved

} //end func
}

// For the start/stop handler, the daemon expects back an error object. If the body is empty, then all is well.
func respondOK(w http.ResponseWriter) {
res := struct {
Expand Down
2 changes: 2 additions & 0 deletions x-pack/dockerlogbeat/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ func main() {
// Create handlers for startup and shutdown of the log driver
sdkHandler.HandleFunc("/LogDriver.StartLogging", startLoggingHandler(pipelines))
sdkHandler.HandleFunc("/LogDriver.StopLogging", stopLoggingHandler(pipelines))
sdkHandler.HandleFunc("/LogDriver.Capabilities", reportCaps())
sdkHandler.HandleFunc("/LogDriver.ReadLogs", readLogHandler(pipelines))

err = sdkHandler.ServeUnix("beatSocket", 0)
if err != nil {
Expand Down
48 changes: 38 additions & 10 deletions x-pack/dockerlogbeat/pipelinemanager/clientLogReader.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
package pipelinemanager

import (
"os"
"io"
"strings"
"time"

"github.com/docker/docker/api/types/plugins/logdriver"
"github.com/docker/docker/daemon/logger"

"github.com/docker/docker/api/types/backend"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
helper "github.com/elastic/beats/v7/libbeat/common/docker"
Expand All @@ -27,12 +29,13 @@ type ClientLogger struct {
client beat.Client
pipelineHash uint64
closer chan struct{}
containerMeta logger.Info
ContainerMeta logger.Info
logger *logp.Logger
logSpool logger.Logger
fearful-symmetry marked this conversation as resolved.
Show resolved Hide resolved
}
fearful-symmetry marked this conversation as resolved.
Show resolved Hide resolved

// newClientFromPipeline creates a new Client logger with a FIFO reader and beat client
func newClientFromPipeline(pipeline beat.PipelineConnector, inputFile *pipereader.PipeReader, hash uint64, info logger.Info) (*ClientLogger, error) {
func newClientFromPipeline(pipeline beat.PipelineConnector, inputFile *pipereader.PipeReader, hash uint64, info logger.Info, localLog logger.Logger) (*ClientLogger, error) {
// setup the beat client
settings := beat.ClientConfig{
WaitClose: 0,
Expand All @@ -49,7 +52,13 @@ func newClientFromPipeline(pipeline beat.PipelineConnector, inputFile *pipereade

clientLogger.Debugf("Created new logger for %d", hash)

return &ClientLogger{logFile: inputFile, client: client, pipelineHash: hash, closer: make(chan struct{}), containerMeta: info, logger: clientLogger}, nil
return &ClientLogger{logFile: inputFile,
client: client,
pipelineHash: hash,
closer: make(chan struct{}),
ContainerMeta: info,
logSpool: localLog,
fearful-symmetry marked this conversation as resolved.
Show resolved Hide resolved
logger: clientLogger}, nil
}

// Close closes the pipeline client and reader
Expand All @@ -63,7 +72,6 @@ func (cl *ClientLogger) Close() error {
// ConsumePipelineAndSend consumes events from the FIFO pipe and sends them to the pipeline client
func (cl *ClientLogger) ConsumePipelineAndSend() {
publishWriter := make(chan logdriver.LogEntry, 500)

go cl.publishLoop(publishWriter)
// Clean up the reader after we're done
defer func() {
Expand All @@ -75,7 +83,11 @@ func (cl *ClientLogger) ConsumePipelineAndSend() {
for {
err := cl.logFile.ReadMessage(&log)
if err != nil {
cl.logger.Error(os.Stderr, "Error getting message: %s\n", err)
// don't log anything on EOF
if err == io.EOF {
fearful-symmetry marked this conversation as resolved.
Show resolved Hide resolved
return
}
cl.logger.Errorf("Error getting message: %s\n", err)
return
}
publishWriter <- log
Expand All @@ -95,18 +107,19 @@ func (cl *ClientLogger) publishLoop(reader chan logdriver.LogEntry) {
return
}

cl.logSpool.Log(constructLogSpoolMsg(entry))
line := strings.TrimSpace(string(entry.Line))

cl.client.Publish(beat.Event{
Timestamp: time.Unix(0, entry.TimeNano),
Fields: common.MapStr{
"message": line,
"container": common.MapStr{
"labels": helper.DeDotLabels(cl.containerMeta.ContainerLabels, true),
"id": cl.containerMeta.ContainerID,
"name": helper.ExtractContainerName([]string{cl.containerMeta.ContainerName}),
"labels": helper.DeDotLabels(cl.ContainerMeta.ContainerLabels, true),
"id": cl.ContainerMeta.ContainerID,
"name": helper.ExtractContainerName([]string{cl.ContainerMeta.ContainerName}),
"image": common.MapStr{
"name": cl.containerMeta.ContainerImageName,
"name": cl.ContainerMeta.ContainerImageName,
},
},
},
Expand All @@ -115,3 +128,18 @@ func (cl *ClientLogger) publishLoop(reader chan logdriver.LogEntry) {
}

}

func constructLogSpoolMsg(line logdriver.LogEntry) *logger.Message {
var msg logger.Message

msg.Line = line.Line
msg.Source = line.Source
msg.Timestamp = time.Unix(0, line.TimeNano)
if line.PartialLogMetadata != nil {
msg.PLogMetaData = &backend.PartialLogMetaData{}
msg.PLogMetaData.ID = line.PartialLogMetadata.Id
msg.PLogMetaData.Last = line.PartialLogMetadata.Last
msg.PLogMetaData.Ordinal = int(line.PartialLogMetadata.Ordinal)
}
return &msg
}
17 changes: 16 additions & 1 deletion x-pack/dockerlogbeat/pipelinemanager/clientLogReader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,18 @@
package pipelinemanager

import (
"os"
"path/filepath"
"sync"
"testing"
"time"

"github.com/docker/docker/daemon/logger"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/docker/docker/daemon/logger/jsonfilelog"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/x-pack/dockerlogbeat/pipelinemock"
Expand Down Expand Up @@ -85,7 +90,17 @@ func createNewClient(t *testing.T, logString string, mockConnector *pipelinemock
reader, err := pipereader.NewReaderFromReadCloser(pipelinemock.CreateTestInputFromLine(t, logString))
require.NoError(t, err)

client, err := newClientFromPipeline(mockConnector, reader, 123, cfgObject)
info := logger.Info{
ContainerID: "b87d3b0379f816a5f2f7070f28cc05e2f564a3fb549a67c64ec30fc5b04142ed",
LogPath: filepath.Join("/tmp/dockerbeattest/", string(time.Now().Unix())),
}

err = os.MkdirAll(filepath.Dir(info.LogPath), 0755)
assert.NoError(t, err)
localLog, err := jsonfilelog.New(info)
assert.NoError(t, err)

client, err := newClientFromPipeline(mockConnector, reader, 123, cfgObject, localLog)
require.NoError(t, err)

return client
Expand Down
2 changes: 1 addition & 1 deletion x-pack/dockerlogbeat/pipelinemanager/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func NewCfgFromRaw(input map[string]string) (ContainerOutputConfig, error) {
newCfg := ContainerOutputConfig{}
endpoint, ok := input["hosts"]
if !ok {
return newCfg, errors.New("An endpoint flag is required")
return newCfg, errors.New("A hosts flag is required")
}

endpointList := strings.Split(endpoint, ",")
Expand Down
Loading