Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

delete pipeline in registry #12414

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion logstash-core/lib/logstash/pipeline_action.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
require "logstash/pipeline_action/create"
require "logstash/pipeline_action/stop"
require "logstash/pipeline_action/reload"
require "logstash/pipeline_action/delete"

module LogStash module PipelineAction
ORDERING = {
LogStash::PipelineAction::Create => 100,
LogStash::PipelineAction::Reload => 200,
LogStash::PipelineAction::Stop => 300
LogStash::PipelineAction::Stop => 300,
LogStash::PipelineAction::Delete => 400
}
end end
38 changes: 38 additions & 0 deletions logstash-core/lib/logstash/pipeline_action/delete.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

require "logstash/pipeline_action/base"

module LogStash module PipelineAction
class Delete < Base
attr_reader :pipeline_id

def initialize(pipeline_id)
@pipeline_id = pipeline_id
end

def execute(agent, pipelines_registry)
success = pipelines_registry.delete_pipeline(@pipeline_id)

LogStash::ConvergeResult::ActionResult.create(self, success)
end

def to_s
"PipelineAction::Delete<#{pipeline_id}>"
end
end
end end
27 changes: 26 additions & 1 deletion logstash-core/lib/logstash/pipelines_registry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ def put(pipeline_id, state)
def remove(pipeline_id)
@lock.synchronize do
@states.delete(pipeline_id)
@locks.delete(pipeline_id)
Copy link
Contributor Author

@kaisecheng kaisecheng Nov 5, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please review the locking mechanism.
the idea is to keep locks[pipeline_id] forever for create, reload, stop and delete to stay mutually exclusive

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO @locks.delete(pipeline_id) need to stay there because keeping @locks[pipeline_id] does not make sense if not also keeping @states[pipeline_id]. Once a pipeline is removed, it's pipeline_id should not exist anymore in the registry at all.
I am not sure I understand in which condition it would be useful to keep it around?

Copy link
Contributor Author

@kaisecheng kaisecheng Nov 5, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thread A, B and C want to update @State of pipeline_id 1

A B C
t1 lock_1 = get_lock(pid: 1)
t2 lock_1 = get_lock(pid: 1)
t3 lock_1.lock
t4 update @State
t5 remove @locks[pid: 1]
t6 lock_1.unlock
t7 lock1.lock
t8 update @State lock_1_new = get_lock(pid: 1)
t9 lock_1_new.lock
t10 update @State

A removes lock in @locks. B holds the old lock
C gets a new lock for pipeline_id 1 as A removed it. B and C have the right to update @State of pipeline_id 1

I think the purpose of @locks is to ensure only one thread can edit the same pipeline_id state simultaneously.
If @locks keeps the lock, it can keep the integrity of action A,B,C

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ha! 💡 I see what you mean. I think you are right here. Let me go over this a bit more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So yes +1 on this change per your reasoning above. The downside here is a memory leak for removed pipelines that will never be recreated but practically speaking the potential for this becoming a problem is extremely low.
I guess that the alternative would be to rethink/refactor the locking logic but that does not seem necessary.

I would probably add a comment about this and maybe link it to this discussion here so that if this code is revisited in the future it is more explicit in the code.

end
end

Expand Down Expand Up @@ -209,6 +208,32 @@ def reload_pipeline(pipeline_id, &reload_block)
lock.unlock
end

# Delete the pipeline that is terminated
# @param pipeline_id [String, Symbol] the pipeline id
# @return [Boolean] pipeline delete success
def delete_pipeline(pipeline_id)
lock = @states.get_lock(pipeline_id)
lock.lock

state = @states.get(pipeline_id)

if state.nil?
logger.error("Attempted to delete a pipeline that does not exists", :pipeline_id => pipeline_id)
return false
end

if state.terminated?
@states.remove(pipeline_id)
logger.info("Removed pipeline from registry successfully", :pipeline_id => pipeline_id)
return true
else
logger.info("Attempted to delete a pipeline that is not terminated", :pipeline_id => pipeline_id)
return false
end
ensure
lock.unlock
end

# @param pipeline_id [String, Symbol] the pipeline id
# @return [Pipeline] the pipeline object or nil if none for pipeline_id
def get_pipeline(pipeline_id)
Expand Down
8 changes: 7 additions & 1 deletion logstash-core/lib/logstash/state_resolver.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,20 @@ def resolve(pipelines_registry, pipeline_configs)
end
end

configured_pipelines = pipeline_configs.map { |config| config.pipeline_id.to_sym }
configured_pipelines = pipeline_configs.map { |config| config.pipeline_id.to_sym }.to_set
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am assuming to_set here is to dedup values but under which condition are there duplicates in this collection??

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the set is for O(1) configured_pipelines.include?. configured_pipelines was a Array

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, ok. But TBH given the small number of pipelines we are dealing with this extra to_set conversion is practically & possibly more costly than the actual include? iterations.

To push that micro optimization further I would try to avoid the Array -> Array -> Set and directly produce a Hash or Set using inject or each_with_object ?

Something like?

configured_pipelines = pipeline_configs.each_with_object(Set.new) { |config, set| set.add(config.pipeline_id.to_sym) }


# If one of the running pipeline is not in the pipeline_configs, we assume that we need to
# stop it.
pipelines_registry.running_pipelines.keys
.select { |pipeline_id| !configured_pipelines.include?(pipeline_id) }
.each { |pipeline_id| actions << LogStash::PipelineAction::Stop.new(pipeline_id) }

# If one of the stopping pipeline is not in the pipeline_configs, we assume that we need to
# delete it in registry.
pipelines_registry.non_running_pipelines.keys
.select { |pipeline_id| !configured_pipelines.include?(pipeline_id) }
.each { |pipeline_id| actions << LogStash::PipelineAction::Delete.new(pipeline_id)}

actions.sort # See logstash/pipeline_action.rb
end
end
Expand Down
32 changes: 32 additions & 0 deletions logstash-core/spec/logstash/pipelines_registry_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,38 @@
end
end

context "deleting a pipeline" do
context "when pipeline is in registry" do
before :each do
subject.create_pipeline(pipeline_id, pipeline) { true }
end

it "should not delete pipeline if pipeline is not terminated" do
expect(pipeline).to receive(:finished_execution?).and_return(false)
expect(LogStash::PipelinesRegistry).to receive(:logger).and_return(logger)
expect(logger).to receive(:info)
expect(subject.delete_pipeline(pipeline_id)).to be_falsey
expect(subject.get_pipeline(pipeline_id)).not_to be_nil
end

it "should delete pipeline if pipeline is terminated" do
expect(pipeline).to receive(:finished_execution?).and_return(true)
expect(LogStash::PipelinesRegistry).to receive(:logger).and_return(logger)
expect(logger).to receive(:info)
expect(subject.delete_pipeline(pipeline_id)).to be_truthy
expect(subject.get_pipeline(pipeline_id)).to be_nil
end
end

context "when pipeline is not in registry" do
it "should log error" do
expect(LogStash::PipelinesRegistry).to receive(:logger).and_return(logger)
expect(logger).to receive(:error)
expect(subject.delete_pipeline(pipeline_id)).to be_falsey
end
end
end

context "pipelines collections" do
context "with a non terminated pipelines" do
before :each do
Expand Down
40 changes: 40 additions & 0 deletions logstash-core/spec/logstash/state_resolver_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -172,5 +172,45 @@
end
end
end

context "when a pipeline stops" do
let(:main_pipeline) { mock_pipeline(:main) }
let(:main_pipeline_config) { main_pipeline.pipeline_config }
let(:pipelines) do
r = LogStash::PipelinesRegistry.new
r.create_pipeline(:main, main_pipeline) { true }
r
end

before do
expect(main_pipeline).to receive(:finished_execution?).at_least(:once).and_return(true)
end

context "when pipeline config contains a new one and the existing" do
let(:pipeline_configs) { [mock_pipeline_config(:hello_world), main_pipeline_config ] }

it "creates the new one and keep the other one stop" do
expect(subject.resolve(pipelines, pipeline_configs)).to have_actions([:create, :hello_world])
expect(pipelines.non_running_pipelines.size).to eq(1)
end
end

context "when pipeline config contains an updated pipeline" do
let(:pipeline_configs) { [mock_pipeline_config(:main, "input { generator {}}")] }

it "should reload the stopped pipeline" do
expect(subject.resolve(pipelines, pipeline_configs)).to have_actions([:reload, :main])
end
end

context "when pipeline config contains no pipeline" do
let(:pipeline_configs) { [] }

it "should delete the stopped one" do
expect(subject.resolve(pipelines, pipeline_configs)).to have_actions([:delete, :main])
end
end
end

end
end