Skip to content

Commit

Permalink
[docker log driver] Move pipeline check + creation to single atomic o…
Browse files Browse the repository at this point in the history
…peration (elastic#14518)

* move pipeline check + creation to single atomic operation
fearful-symmetry authored Nov 18, 2019
1 parent 7fee4d2 commit 5cb70ce
Showing 1 changed file with 20 additions and 39 deletions.
59 changes: 20 additions & 39 deletions x-pack/dockerlogbeat/pipelinemanager/pipelineManager.go
Original file line number Diff line number Diff line change
@@ -74,25 +74,15 @@ func (pm *PipelineManager) CloseClientWithFile(file string) error {
func (pm *PipelineManager) CreateClientWithConfig(logOptsConfig map[string]string, file string) (*ClientLogger, error) {

hashstring := makeConfigHash(logOptsConfig)

//If we don't have an existing pipeline for this hash, make one
exists := pm.checkIfHashExists(logOptsConfig)
var pipeline *Pipeline
var err error
if !exists {
pipeline, err = loadNewPipeline(logOptsConfig, file, pm.Logger)
if err != nil {
return nil, errors.Wrap(err, "error loading pipeline")
}
pm.registerPipeline(pipeline, hashstring)
} else {
pipeline, _ = pm.getPipeline(hashstring)
pipeline, err := pm.getOrCreatePipeline(logOptsConfig, file, hashstring)
if err != nil {
return nil, errors.Wrap(err, "error getting pipeline")
}

//actually get to crafting the new client.
cl, err := newClientFromPipeline(pipeline.pipeline, file, hashstring)
if err != nil {
return nil, err
return nil, errors.Wrap(err, "error creating client")
}

pm.registerClient(cl, hashstring, file)
@@ -103,12 +93,24 @@ func (pm *PipelineManager) CreateClientWithConfig(logOptsConfig map[string]strin
//===================
// Private methods

// getPipeline gets a pipeline based on a confighash
func (pm *PipelineManager) getPipeline(hashstring string) (*Pipeline, bool) {
// checkAndCreatePipeline performs the pipeline check and creation as one atomic operation
// It will either return a new pipeline, or an existing one from the pipeline map
func (pm *PipelineManager) getOrCreatePipeline(logOptsConfig map[string]string, file string, hashstring string) (*Pipeline, error) {
pm.mu.Lock()
defer pm.mu.Unlock()
pipeline, exists := pm.pipelines[hashstring]
return pipeline, exists

var pipeline *Pipeline
var err error
pipeline, test := pm.pipelines[hashstring]
if !test {
pipeline, err = loadNewPipeline(logOptsConfig, file, pm.Logger)
if err != nil {
return nil, errors.Wrap(err, "error loading pipeline")
}
pm.pipelines[hashstring] = pipeline
}

return pipeline, nil
}

// getClient gets a pipeline client based on a file handle
@@ -119,27 +121,6 @@ func (pm *PipelineManager) getClient(file string) (*ClientLogger, bool) {
return cli, exists
}

// checkIfHashExists is a short atomic function to see if a pipeline alread exists inside the PM. Thread-safe.
func (pm *PipelineManager) checkIfHashExists(logOptsConfig map[string]string) bool {
hashstring := makeConfigHash(logOptsConfig)
pm.mu.Lock()
defer pm.mu.Unlock()
_, test := pm.pipelines[hashstring]
if test {
return true
}
return false
}

// registerPipeline is a small atomic function that registers a new pipeline with the managers
// TODO: What happens if we try to register a pipeline that already exists? Which pipeline "wins"?
func (pm *PipelineManager) registerPipeline(pipeline *Pipeline, hashstring string) {
pm.mu.Lock()
defer pm.mu.Unlock()
pm.pipelines[hashstring] = pipeline

}

// removePipeline removes a pipeline from the manager if it's refcount is zero.
func (pm *PipelineManager) removePipelineIfNeeded(hash string) {
pm.mu.Lock()

0 comments on commit 5cb70ce

Please sign in to comment.