From 5cb70ce4bfdf10cb8840985de0566aa7c61fa074 Mon Sep 17 00:00:00 2001 From: Alex K <8418476+fearful-symmetry@users.noreply.github.com> Date: Mon, 18 Nov 2019 09:38:51 -0800 Subject: [PATCH] [docker log driver] Move pipeline check + creation to single atomic operation (#14518) * move pipeline check + creation to single atomic operation --- .../pipelinemanager/pipelineManager.go | 59 +++++++------------ 1 file changed, 20 insertions(+), 39 deletions(-) diff --git a/x-pack/dockerlogbeat/pipelinemanager/pipelineManager.go b/x-pack/dockerlogbeat/pipelinemanager/pipelineManager.go index 79e75ed6ec8..68703d0159f 100644 --- a/x-pack/dockerlogbeat/pipelinemanager/pipelineManager.go +++ b/x-pack/dockerlogbeat/pipelinemanager/pipelineManager.go @@ -74,25 +74,15 @@ func (pm *PipelineManager) CloseClientWithFile(file string) error { func (pm *PipelineManager) CreateClientWithConfig(logOptsConfig map[string]string, file string) (*ClientLogger, error) { hashstring := makeConfigHash(logOptsConfig) - - //If we don't have an existing pipeline for this hash, make one - exists := pm.checkIfHashExists(logOptsConfig) - var pipeline *Pipeline - var err error - if !exists { - pipeline, err = loadNewPipeline(logOptsConfig, file, pm.Logger) - if err != nil { - return nil, errors.Wrap(err, "error loading pipeline") - } - pm.registerPipeline(pipeline, hashstring) - } else { - pipeline, _ = pm.getPipeline(hashstring) + pipeline, err := pm.getOrCreatePipeline(logOptsConfig, file, hashstring) + if err != nil { + return nil, errors.Wrap(err, "error getting pipeline") } //actually get to crafting the new client. cl, err := newClientFromPipeline(pipeline.pipeline, file, hashstring) if err != nil { - return nil, err + return nil, errors.Wrap(err, "error creating client") } pm.registerClient(cl, hashstring, file) @@ -103,12 +93,24 @@ func (pm *PipelineManager) CreateClientWithConfig(logOptsConfig map[string]strin //=================== // Private methods -// getPipeline gets a pipeline based on a confighash -func (pm *PipelineManager) getPipeline(hashstring string) (*Pipeline, bool) { +// checkAndCreatePipeline performs the pipeline check and creation as one atomic operation +// It will either return a new pipeline, or an existing one from the pipeline map +func (pm *PipelineManager) getOrCreatePipeline(logOptsConfig map[string]string, file string, hashstring string) (*Pipeline, error) { pm.mu.Lock() defer pm.mu.Unlock() - pipeline, exists := pm.pipelines[hashstring] - return pipeline, exists + + var pipeline *Pipeline + var err error + pipeline, test := pm.pipelines[hashstring] + if !test { + pipeline, err = loadNewPipeline(logOptsConfig, file, pm.Logger) + if err != nil { + return nil, errors.Wrap(err, "error loading pipeline") + } + pm.pipelines[hashstring] = pipeline + } + + return pipeline, nil } // getClient gets a pipeline client based on a file handle @@ -119,27 +121,6 @@ func (pm *PipelineManager) getClient(file string) (*ClientLogger, bool) { return cli, exists } -// checkIfHashExists is a short atomic function to see if a pipeline alread exists inside the PM. Thread-safe. -func (pm *PipelineManager) checkIfHashExists(logOptsConfig map[string]string) bool { - hashstring := makeConfigHash(logOptsConfig) - pm.mu.Lock() - defer pm.mu.Unlock() - _, test := pm.pipelines[hashstring] - if test { - return true - } - return false -} - -// registerPipeline is a small atomic function that registers a new pipeline with the managers -// TODO: What happens if we try to register a pipeline that already exists? Which pipeline "wins"? -func (pm *PipelineManager) registerPipeline(pipeline *Pipeline, hashstring string) { - pm.mu.Lock() - defer pm.mu.Unlock() - pm.pipelines[hashstring] = pipeline - -} - // removePipeline removes a pipeline from the manager if it's refcount is zero. func (pm *PipelineManager) removePipelineIfNeeded(hash string) { pm.mu.Lock()