From 05f82e7fdb879ef6af8c344ebc7dc3ca160b88f3 Mon Sep 17 00:00:00 2001 From: Alex K <8418476+fearful-symmetry@users.noreply.github.com> Date: Thu, 9 Jul 2020 10:11:02 -0700 Subject: [PATCH] Add `docker logs` support to the Elastic Log Driver (#19531) * init commit of docker logs support * remove vendor * fix tests * mage fmt * code cleanup * change logging directories around, add code to remove containers * remove error message on EOF * fix config, add docs * more fixes, migrate to writing logs to docker's own location * add changelog entry * docs cleanup --- CHANGELOG.next.asciidoc | 3 + x-pack/dockerlogbeat/config.json | 24 +++ .../dockerlogbeat/docs/configuration.asciidoc | 73 ++++++++- x-pack/dockerlogbeat/handlers.go | 50 +++++- x-pack/dockerlogbeat/main.go | 17 ++- .../pipelinemanager/clientLogReader.go | 67 ++++++--- .../pipelinemanager/clientLogReader_test.go | 17 ++- .../dockerlogbeat/pipelinemanager/config.go | 2 +- .../pipelinemanager/pipelineManager.go | 142 +++++++++++++++++- x-pack/dockerlogbeat/pipereader/reader.go | 2 +- x-pack/dockerlogbeat/readme.md | 10 ++ 11 files changed, 371 insertions(+), 36 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 9c18e744ccf..76ebb7f0002 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -560,6 +560,9 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add registry and code signature information and ECS categorization fields for sysmon module {pull}18058[18058] - Add new winlogbeat security dashboard {pull}18775[18775] +*Elastic Log Driver* +- Add support for `docker logs` command {pull}19531[19531] + ==== Deprecated *Affecting all Beats* diff --git a/x-pack/dockerlogbeat/config.json b/x-pack/dockerlogbeat/config.json index d3072a841bc..5d7d1d3c745 100644 --- a/x-pack/dockerlogbeat/config.json +++ b/x-pack/dockerlogbeat/config.json @@ -13,7 +13,31 @@ ], "socket": "beatSocket.sock" }, + "mounts": [ + { + "name": "LOG_DIR", + "description": "Mount for local log cache", + "destination": "/var/log/docker", + "source": "/var/lib/docker", + "type": "none", + "options": [ + "rw", + "rbind" + ], + "Settable": [ + "source" + ] + } + ], "env": [ + { + "description": "Destroy logs after a container has stopped", + "name": "DESTROY_LOGS_ON_STOP", + "value": "false", + "Settable": [ + "value" + ] + }, { "description": "debug level", "name": "LOG_DRIVER_LEVEL", diff --git a/x-pack/dockerlogbeat/docs/configuration.asciidoc b/x-pack/dockerlogbeat/docs/configuration.asciidoc index 708eb941a91..f1bf6821489 100644 --- a/x-pack/dockerlogbeat/docs/configuration.asciidoc +++ b/x-pack/dockerlogbeat/docs/configuration.asciidoc @@ -49,10 +49,6 @@ format is `"username:password"`. [[es-output-options]] === {es} output options -// TODO: Add the following settings. Syntax is a little different so we might -// need to add deameon examples that show how to specify these settings: -// `output.elasticsearch.indices -// `output.elasticsearch.pipelines` [options="header"] |===== @@ -117,3 +113,72 @@ for more information about the environment variables. |===== + + +[float] +[[local-log-opts]] +=== Configuring the local log +This plugin fully supports `docker logs`, and it maintains a local copy of logs that can be read without a connection to Elasticsearch. The plugin mounts the `/var/lib/docker` directory on the host to write logs to `/var/log/containers` on the host. If you want to change the log location on the host, you must change the mount inside the plugin: + +1. Disable the plugin: ++ +["source","sh",subs="attributes"] +---- +docker plugin disable elastic/{log-driver-alias}:{version} +---- + +2. Set the bindmount directory: ++ +["source","sh",subs="attributes"] +---- +docker plugin set elastic/{log-driver-alias}:{version} LOG_DIR.source=NEW_LOG_LOCATION +---- ++ + +3. Enable the plugin: ++ +["source","sh",subs="attributes"] +---- +docker plugin enable elastic/{log-driver-alias}:{version} +---- + + +The local log also supports the `max-file`, `max-size` and `compress` options that are https://docs.docker.com/config/containers/logging/json-file/#options[a part of the Docker default file logger]. For example: + +["source","sh",subs="attributes"] +---- +docker run --log-driver=elastic/{log-driver-alias}:{version} \ + --log-opt endpoint="myhost:9200" \ + --log-opt user="myusername" \ + --log-opt password="mypassword" \ + --log-opt max-file=10 \ + --log-opt max-size=5M \ + --log-opt compress=true \ + -it debian:jessie /bin/bash +---- + + +In situations where logs can't be easily managed, for example, you can also configure the plugin to remove log files when a container is stopped. This will prevent you from reading logs on a stopped container, but it will rotate logs without user intervention. To enable removal of logs for stopped containers, you must change the `DESTROY_LOGS_ON_STOP` environment variable: + +1. Disable the plugin: ++ +["source","sh",subs="attributes"] +---- +docker plugin disable elastic/{log-driver-alias}:{version} +---- + +2. Enable log removal: ++ +["source","sh",subs="attributes"] +---- +docker plugin set elastic/{log-driver-alias}:{version} DESTROY_LOGS_ON_STOP=true +---- ++ + +3. Enable the plugin: ++ +["source","sh",subs="attributes"] +---- +docker plugin enable elastic/{log-driver-alias}:{version} +---- + diff --git a/x-pack/dockerlogbeat/handlers.go b/x-pack/dockerlogbeat/handlers.go index 604c029e601..8b3a771a741 100644 --- a/x-pack/dockerlogbeat/handlers.go +++ b/x-pack/dockerlogbeat/handlers.go @@ -6,12 +6,14 @@ package main import ( "encoding/json" + "io" "net/http" "github.com/docker/docker/daemon/logger" "github.com/elastic/beats/v7/x-pack/dockerlogbeat/pipelinemanager" + "github.com/docker/docker/pkg/ioutils" "github.com/pkg/errors" ) @@ -26,6 +28,26 @@ type StopLoggingRequest struct { File string } +// capabilitiesResponse represents the response to a capabilities request +type capabilitiesResponse struct { + Err string + Cap logger.Capability +} + +// logsRequest represents the request object we get from a `docker logs` call +type logsRequest struct { + Info logger.Info + Config logger.ReadConfig +} + +func reportCaps() func(w http.ResponseWriter, r *http.Request) { + return func(w http.ResponseWriter, r *http.Request) { + json.NewEncoder(w).Encode(&capabilitiesResponse{ + Cap: logger.Capability{ReadLogs: true}, + }) + } +} + // This gets called when a container starts that requests the log driver func startLoggingHandler(pm *pipelinemanager.PipelineManager) func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) { @@ -36,7 +58,7 @@ func startLoggingHandler(pm *pipelinemanager.PipelineManager) func(w http.Respon return } - pm.Logger.Infof("Got start request object from container %#v\n", startReq.Info.ContainerName) + pm.Logger.Debugf("Got start request object from container %#v\n", startReq.Info.ContainerName) pm.Logger.Debugf("Got a container with the following labels: %#v\n", startReq.Info.ContainerLabels) pm.Logger.Debugf("Got a container with the following log opts: %#v\n", startReq.Info.Config) @@ -67,7 +89,7 @@ func stopLoggingHandler(pm *pipelinemanager.PipelineManager) func(w http.Respons http.Error(w, errors.Wrap(err, "error decoding json request").Error(), http.StatusBadRequest) return } - pm.Logger.Infof("Got stop request object %#v\n", stopReq) + pm.Logger.Debugf("Got stop request object %#v\n", stopReq) // Run the stop async, since nothing 'depends' on it, // and we can break people's docker automation if this times out. go func() { @@ -81,6 +103,30 @@ func stopLoggingHandler(pm *pipelinemanager.PipelineManager) func(w http.Respons } // end func } +func readLogHandler(pm *pipelinemanager.PipelineManager) func(w http.ResponseWriter, r *http.Request) { + return func(w http.ResponseWriter, r *http.Request) { + var logReq logsRequest + err := json.NewDecoder(r.Body).Decode(&logReq) + if err != nil { + http.Error(w, errors.Wrap(err, "error decoding json request").Error(), http.StatusBadRequest) + return + } + + pm.Logger.Debugf("Got logging request for container %s\n", logReq.Info.ContainerName) + stream, err := pm.CreateReaderForContainer(logReq.Info, logReq.Config) + if err != nil { + http.Error(w, errors.Wrap(err, "error creating log reader").Error(), http.StatusBadRequest) + return + } + defer stream.Close() + w.Header().Set("Content-Type", "application/x-json-stream") + wf := ioutils.NewWriteFlusher(w) + defer wf.Close() + io.Copy(wf, stream) + + } //end func +} + // For the start/stop handler, the daemon expects back an error object. If the body is empty, then all is well. func respondOK(w http.ResponseWriter) { res := struct { diff --git a/x-pack/dockerlogbeat/main.go b/x-pack/dockerlogbeat/main.go index 360fd265caa..e3a5b8d0310 100644 --- a/x-pack/dockerlogbeat/main.go +++ b/x-pack/dockerlogbeat/main.go @@ -7,6 +7,7 @@ package main import ( "fmt" "os" + "strconv" "github.com/docker/go-plugins-helpers/sdk" @@ -41,6 +42,14 @@ func genNewMonitoringConfig() (*common.Config, error) { return cfg, nil } +func setDestroyLogsOnStop() (bool, error) { + setting, ok := os.LookupEnv("DESTROY_LOGS_ON_STOP") + if !ok { + return false, nil + } + return strconv.ParseBool(setting) +} + func fatal(format string, vs ...interface{}) { fmt.Fprintf(os.Stderr, format, vs...) os.Exit(1) @@ -60,12 +69,18 @@ func main() { fatal("error starting log handler: %s", err) } - pipelines := pipelinemanager.NewPipelineManager(logcfg) + logDestroy, err := setDestroyLogsOnStop() + if err != nil { + fatal("DESTROY_LOGS_ON_STOP must be 'true' or 'false': %s", err) + } + pipelines := pipelinemanager.NewPipelineManager(logDestroy) sdkHandler := sdk.NewHandler(`{"Implements": ["LoggingDriver"]}`) // Create handlers for startup and shutdown of the log driver sdkHandler.HandleFunc("/LogDriver.StartLogging", startLoggingHandler(pipelines)) sdkHandler.HandleFunc("/LogDriver.StopLogging", stopLoggingHandler(pipelines)) + sdkHandler.HandleFunc("/LogDriver.Capabilities", reportCaps()) + sdkHandler.HandleFunc("/LogDriver.ReadLogs", readLogHandler(pipelines)) err = sdkHandler.ServeUnix("beatSocket", 0) if err != nil { diff --git a/x-pack/dockerlogbeat/pipelinemanager/clientLogReader.go b/x-pack/dockerlogbeat/pipelinemanager/clientLogReader.go index 62d54c88b6a..1a82dd214e5 100644 --- a/x-pack/dockerlogbeat/pipelinemanager/clientLogReader.go +++ b/x-pack/dockerlogbeat/pipelinemanager/clientLogReader.go @@ -5,13 +5,15 @@ package pipelinemanager import ( - "os" + "io" "strings" "time" "github.com/docker/docker/api/types/plugins/logdriver" "github.com/docker/docker/daemon/logger" + "github.com/docker/docker/api/types/backend" + "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/acker" @@ -20,20 +22,26 @@ import ( "github.com/elastic/beats/v7/x-pack/dockerlogbeat/pipereader" ) -// ClientLogger is an instance of a pipeline logger client meant for reading from a single log stream -// There's a many-to-one relationship between clients and pipelines. -// Each container with the same config will get its own client to the same pipeline. +// ClientLogger collects logs for a docker container logging to stdout and stderr, using the FIFO provided by the docker daemon. +// Each log line is written to a local log file for retrieval via "docker logs", and forwarded to the beats publisher pipeline. +// The local log storage is based on the docker json-file logger and supports the same settings. If "max-size" is not configured, we will rotate the log file every 10MB. type ClientLogger struct { - logFile *pipereader.PipeReader - client beat.Client - pipelineHash uint64 - closer chan struct{} - containerMeta logger.Info - logger *logp.Logger + // pipelineHash is a hash of the libbeat publisher pipeline config + pipelineHash uint64 + // logger is the internal error message logger + logger *logp.Logger + // ContainerMeta is the metadata object for the container we get from docker + ContainerMeta logger.Info + // logFile is the FIFO reader that reads from the docker container stdio + logFile *pipereader.PipeReader + // client is the libbeat client object that sends logs upstream + client beat.Client + // localLog manages the local JSON logs for containers + localLog logger.Logger } // newClientFromPipeline creates a new Client logger with a FIFO reader and beat client -func newClientFromPipeline(pipeline beat.PipelineConnector, inputFile *pipereader.PipeReader, hash uint64, info logger.Info) (*ClientLogger, error) { +func newClientFromPipeline(pipeline beat.PipelineConnector, inputFile *pipereader.PipeReader, hash uint64, info logger.Info, localLog logger.Logger) (*ClientLogger, error) { // setup the beat client settings := beat.ClientConfig{ WaitClose: 0, @@ -50,7 +58,12 @@ func newClientFromPipeline(pipeline beat.PipelineConnector, inputFile *pipereade clientLogger.Debugf("Created new logger for %d", hash) - return &ClientLogger{logFile: inputFile, client: client, pipelineHash: hash, closer: make(chan struct{}), containerMeta: info, logger: clientLogger}, nil + return &ClientLogger{logFile: inputFile, + client: client, + pipelineHash: hash, + ContainerMeta: info, + localLog: localLog, + logger: clientLogger}, nil } // Close closes the pipeline client and reader @@ -64,7 +77,6 @@ func (cl *ClientLogger) Close() error { // ConsumePipelineAndSend consumes events from the FIFO pipe and sends them to the pipeline client func (cl *ClientLogger) ConsumePipelineAndSend() { publishWriter := make(chan logdriver.LogEntry, 500) - go cl.publishLoop(publishWriter) // Clean up the reader after we're done defer func() { @@ -76,7 +88,10 @@ func (cl *ClientLogger) ConsumePipelineAndSend() { for { err := cl.logFile.ReadMessage(&log) if err != nil { - cl.logger.Error(os.Stderr, "Error getting message: %s\n", err) + if err == io.EOF { + return + } + cl.logger.Errorf("Error getting message: %s\n", err) return } publishWriter <- log @@ -96,6 +111,7 @@ func (cl *ClientLogger) publishLoop(reader chan logdriver.LogEntry) { return } + cl.localLog.Log(constructLogSpoolMsg(entry)) line := strings.TrimSpace(string(entry.Line)) cl.client.Publish(beat.Event{ @@ -103,11 +119,11 @@ func (cl *ClientLogger) publishLoop(reader chan logdriver.LogEntry) { Fields: common.MapStr{ "message": line, "container": common.MapStr{ - "labels": helper.DeDotLabels(cl.containerMeta.ContainerLabels, true), - "id": cl.containerMeta.ContainerID, - "name": helper.ExtractContainerName([]string{cl.containerMeta.ContainerName}), + "labels": helper.DeDotLabels(cl.ContainerMeta.ContainerLabels, true), + "id": cl.ContainerMeta.ContainerID, + "name": helper.ExtractContainerName([]string{cl.ContainerMeta.ContainerName}), "image": common.MapStr{ - "name": cl.containerMeta.ContainerImageName, + "name": cl.ContainerMeta.ContainerImageName, }, }, }, @@ -116,3 +132,18 @@ func (cl *ClientLogger) publishLoop(reader chan logdriver.LogEntry) { } } + +func constructLogSpoolMsg(line logdriver.LogEntry) *logger.Message { + var msg logger.Message + + msg.Line = line.Line + msg.Source = line.Source + msg.Timestamp = time.Unix(0, line.TimeNano) + if line.PartialLogMetadata != nil { + msg.PLogMetaData = &backend.PartialLogMetaData{} + msg.PLogMetaData.ID = line.PartialLogMetadata.Id + msg.PLogMetaData.Last = line.PartialLogMetadata.Last + msg.PLogMetaData.Ordinal = int(line.PartialLogMetadata.Ordinal) + } + return &msg +} diff --git a/x-pack/dockerlogbeat/pipelinemanager/clientLogReader_test.go b/x-pack/dockerlogbeat/pipelinemanager/clientLogReader_test.go index 4f396b194be..b53d26e234d 100644 --- a/x-pack/dockerlogbeat/pipelinemanager/clientLogReader_test.go +++ b/x-pack/dockerlogbeat/pipelinemanager/clientLogReader_test.go @@ -5,13 +5,18 @@ package pipelinemanager import ( + "os" + "path/filepath" "sync" "testing" + "time" "github.com/docker/docker/daemon/logger" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/docker/docker/daemon/logger/jsonfilelog" + "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/x-pack/dockerlogbeat/pipelinemock" @@ -85,7 +90,17 @@ func createNewClient(t *testing.T, logString string, mockConnector *pipelinemock reader, err := pipereader.NewReaderFromReadCloser(pipelinemock.CreateTestInputFromLine(t, logString)) require.NoError(t, err) - client, err := newClientFromPipeline(mockConnector, reader, 123, cfgObject) + info := logger.Info{ + ContainerID: "b87d3b0379f816a5f2f7070f28cc05e2f564a3fb549a67c64ec30fc5b04142ed", + LogPath: filepath.Join("/tmp/dockerbeattest/", string(time.Now().Unix())), + } + + err = os.MkdirAll(filepath.Dir(info.LogPath), 0755) + assert.NoError(t, err) + localLog, err := jsonfilelog.New(info) + assert.NoError(t, err) + + client, err := newClientFromPipeline(mockConnector, reader, 123, cfgObject, localLog) require.NoError(t, err) return client diff --git a/x-pack/dockerlogbeat/pipelinemanager/config.go b/x-pack/dockerlogbeat/pipelinemanager/config.go index 3da18bc8546..92d6e98ee9f 100644 --- a/x-pack/dockerlogbeat/pipelinemanager/config.go +++ b/x-pack/dockerlogbeat/pipelinemanager/config.go @@ -35,7 +35,7 @@ func NewCfgFromRaw(input map[string]string) (ContainerOutputConfig, error) { newCfg := ContainerOutputConfig{} endpoint, ok := input["hosts"] if !ok { - return newCfg, errors.New("An endpoint flag is required") + return newCfg, errors.New("A hosts flag is required") } endpointList := strings.Split(endpoint, ",") diff --git a/x-pack/dockerlogbeat/pipelinemanager/pipelineManager.go b/x-pack/dockerlogbeat/pipelinemanager/pipelineManager.go index e96caa77863..b1d04d16541 100644 --- a/x-pack/dockerlogbeat/pipelinemanager/pipelineManager.go +++ b/x-pack/dockerlogbeat/pipelinemanager/pipelineManager.go @@ -5,7 +5,11 @@ package pipelinemanager import ( + "encoding/binary" "fmt" + "io" + "os" + "path/filepath" "sync" "github.com/mitchellh/hashstructure" @@ -14,7 +18,11 @@ import ( "github.com/pkg/errors" + "github.com/docker/docker/api/types/plugins/logdriver" "github.com/docker/docker/daemon/logger" + "github.com/docker/docker/daemon/logger/jsonfilelog" + + protoio "github.com/gogo/protobuf/io" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" @@ -41,14 +49,23 @@ type PipelineManager struct { pipelines map[uint64]*Pipeline // clients config: filepath clients map[string]*ClientLogger + // Client Logger key: container hash + clientLogger map[string]logger.Logger + // logDirectory is the bindmount for local container logsd + logDirectory string + // destroyLogsOnStop indicates for the client to remove log files when a container stops + destroyLogsOnStop bool } // NewPipelineManager creates a new Pipeline map -func NewPipelineManager(logCfg *common.Config) *PipelineManager { +func NewPipelineManager(logDestroy bool) *PipelineManager { return &PipelineManager{ - Logger: logp.NewLogger("PipelineManager"), - pipelines: make(map[uint64]*Pipeline), - clients: make(map[string]*ClientLogger), + Logger: logp.NewLogger("PipelineManager"), + pipelines: make(map[uint64]*Pipeline), + clients: make(map[string]*ClientLogger), + clientLogger: make(map[string]logger.Logger), + logDirectory: "/var/log/docker/containers", + destroyLogsOnStop: logDestroy, } } @@ -62,13 +79,16 @@ func (pm *PipelineManager) CloseClientWithFile(file string) error { hash := cl.pipelineHash + // remove the logger + pm.removeLogger(cl.ContainerMeta) + pm.Logger.Debugf("Closing Client first from pipelineManager") err = cl.Close() if err != nil { return errors.Wrap(err, "error closing client") } - //if the pipeline is no longer in use, clean up + // if the pipeline is no longer in use, clean up pm.removePipelineIfNeeded(hash) return nil @@ -89,20 +109,90 @@ func (pm *PipelineManager) CreateClientWithConfig(containerConfig ContainerOutpu reader, err := pipereader.NewReaderFromPath(file) if err != nil { - return nil, errors.Wrap(err, "") + return nil, errors.Wrap(err, "error creating reader for docker log stream") + } + + // Why is this empty by default? What should be here? Who knows! + if info.LogPath == "" { + info.LogPath = filepath.Join(pm.logDirectory, info.ContainerID, fmt.Sprintf("%s-json.log", info.ContainerID)) + } + err = os.MkdirAll(filepath.Dir(info.LogPath), 0755) + if err != nil { + return nil, errors.Wrap(err, "error creating directory for local logs") + } + // set a default log size + if _, ok := info.Config["max-size"]; !ok { + info.Config["max-size"] = "10M" + } + // set a default log count + if _, ok := info.Config["max-file"]; !ok { + info.Config["max-file"] = "5" + } + + localLog, err := jsonfilelog.New(info) + if err != nil { + return nil, errors.Wrap(err, "error creating local log") } //actually get to crafting the new client. - cl, err := newClientFromPipeline(pipeline.pipeline, reader, hashstring, info) + cl, err := newClientFromPipeline(pipeline.pipeline, reader, hashstring, info, localLog) if err != nil { return nil, errors.Wrap(err, "error creating client") } pm.registerClient(cl, hashstring, file) - + pm.registerLogger(localLog, info) return cl, nil } +// CreateReaderForContainer responds to docker logs requests to pull local logs from the json logger +func (pm *PipelineManager) CreateReaderForContainer(info logger.Info, config logger.ReadConfig) (io.ReadCloser, error) { + logObject, exists := pm.getLogger(info) + if !exists { + return nil, fmt.Errorf("Could not find logger for %s", info.ContainerID) + } + pipeReader, pipeWriter := io.Pipe() + logReader, ok := logObject.(logger.LogReader) + if !ok { + return nil, fmt.Errorf("logger does not support reading") + } + + go func() { + watcher := logReader.ReadLogs(config) + + enc := protoio.NewUint32DelimitedWriter(pipeWriter, binary.BigEndian) + defer enc.Close() + defer watcher.ConsumerGone() + var rawLog logdriver.LogEntry + for { + select { + case msg, ok := <-watcher.Msg: + if !ok { + pipeWriter.Close() + return + } + rawLog.Line = msg.Line + rawLog.Partial = msg.PLogMetaData != nil + rawLog.TimeNano = msg.Timestamp.UnixNano() + rawLog.Source = msg.Source + + if err := enc.WriteMsg(&rawLog); err != nil { + pipeWriter.CloseWithError(err) + return + } + + case err := <-watcher.Err: + pipeWriter.CloseWithError(err) + return + + } + } + + }() + + return pipeReader, nil +} + //=================== // Private methods @@ -134,6 +224,13 @@ func (pm *PipelineManager) getClient(file string) (*ClientLogger, bool) { return cli, exists } +func (pm *PipelineManager) getLogger(info logger.Info) (logger.Logger, bool) { + pm.mu.Lock() + defer pm.mu.Unlock() + logger, exists := pm.clientLogger[info.ContainerID] + return logger, exists +} + // removePipeline removes a pipeline from the manager if it's refcount is zero. func (pm *PipelineManager) removePipelineIfNeeded(hash uint64) { pm.mu.Lock() @@ -161,6 +258,35 @@ func (pm *PipelineManager) registerClient(cl *ClientLogger, hash uint64, clientF pm.pipelines[hash].refCount++ } +// registerLogger registers a local logger used for reading back logs +func (pm *PipelineManager) registerLogger(log logger.Logger, info logger.Info) { + pm.mu.Lock() + defer pm.mu.Unlock() + pm.clientLogger[info.ContainerID] = log +} + +// removeLogger removes a logging instace +func (pm *PipelineManager) removeLogger(info logger.Info) { + pm.mu.Lock() + defer pm.mu.Unlock() + logger, exists := pm.clientLogger[info.ContainerID] + if !exists { + return + } + logger.Close() + delete(pm.clientLogger, info.ContainerID) + if pm.destroyLogsOnStop { + pm.removeLogFile(info.ContainerID) + } +} + +// removeLogFile removes a log file for a given container. Disabled by default. +func (pm *PipelineManager) removeLogFile(id string) error { + toRemove := filepath.Join(pm.logDirectory, id) + + return os.Remove(toRemove) +} + // removeClient deregisters a client func (pm *PipelineManager) removeClient(file string) (*ClientLogger, error) { pm.mu.Lock() diff --git a/x-pack/dockerlogbeat/pipereader/reader.go b/x-pack/dockerlogbeat/pipereader/reader.go index f622fb03ce6..d1f8eb05c21 100644 --- a/x-pack/dockerlogbeat/pipereader/reader.go +++ b/x-pack/dockerlogbeat/pipereader/reader.go @@ -54,7 +54,7 @@ func (reader *PipeReader) ReadMessage(log *logdriver.LogEntry) error { for { lenFrame, err = reader.getValidLengthFrame() if err != nil { - return errors.Wrap(err, "error getting length frame") + return err } if lenFrame <= reader.maxSize { break diff --git a/x-pack/dockerlogbeat/readme.md b/x-pack/dockerlogbeat/readme.md index b6c97035c53..be06d96daa9 100644 --- a/x-pack/dockerlogbeat/readme.md +++ b/x-pack/dockerlogbeat/readme.md @@ -48,3 +48,13 @@ The location of the logs AND the container base directory in the docker docs is You can use this to find the list of plugins running on the host: `runc --root /containers/services/docker/rootfs/run/docker/plugins/runtime-root/plugins.moby/ list` The logs are in `/var/log/docker`. If you want to make the logs useful, you need to find the ID of the plugin. Back on the darwin host, run `docker plugin inspect $name_of_plugin | grep Id` use the hash ID to grep through the logs: `grep 22bb02c1506677cd48cc1cfccc0847c1b602f48f735e51e4933001804f86e2e docker.*` + + +## Local logs + +This plugin fully supports `docker logs`, and it maintains a local copy of logs that can be read without a connection to Elasticsearch. Unfortunately, due to the limitations in the docker plugin API, we can't "clean up" log files when a container is destroyed. The plugin mounts the `/var/lib/docker` directory on the host to write logs. This mount point can be changed via [Docker](https://docs.docker.com/engine/reference/commandline/plugin_set/#change-the-source-of-a-mount). The plugin can also be configured to do a "hard" cleanup and destroy logs when a container stops. To enable this, set the `DESTROY_LOGS_ON_STOP` environment var inside the plugin: +``` +docker plugin set d805664c550e DESTROY_LOGS_ON_STOP=true +``` + +You can also set `max-file`, `max-size` and `compress` via `--log-opts` \ No newline at end of file