-
Notifications
You must be signed in to change notification settings - Fork 897
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Rearch] Use memcached for sending messages to workers #15471
[Rearch] Use memcached for sending messages to workers #15471
Conversation
This pull request is not mergeable. Please rebase and repush. |
c266896
to
4ad30dc
Compare
4ad30dc
to
1053c9b
Compare
1053c9b
to
6a536af
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, I realize you didn't ask for my review, but I am giving it anyway! 😤
But in all seriousness, this seems to coincide with some of the research I am doing around SIGTERM
with workers (or rather, how MiqServer currently tells workers to exit), and specifically that the same message interface is used for killing workers currently as it is for syncing configs:
I don't think it is necessary to do anything for in this PR, especially since the workers will be killed through OpenShift/kubernetes I believe (not sure what we have in place for that so far). But if any messages might be needed to be passed around, this would probably be how we would do it (that said, some of my review comments suggest getting rid of a shared mechanism for this, so those are dependent on if message passing is really needed).
Anyway, just some things I noticed while taking a look at what you have done here.
@@ -147,8 +147,22 @@ def sync_needed? | |||
start_apache if roles_changed && apache_needed? | |||
|
|||
reset_queue_messages if config_changed || roles_changed | |||
|
|||
update_sync_timestamp(@last_sync) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be gated with if ENV["WORKER_HEARTBEAT_METHOD"]
, otherwise these messages won't get processed, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this isn't really a "message to be processed", and just a single key/val... so maybe not so relevant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I figured it wouldn't hurt to always set the timestamp on memcached even if the worker is still getting its messages from Drb.
|
||
def key_store | ||
@key_store ||= Dalli::Client.new(MiqMemcached.server_address, :namespace => "server_monitor") | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is used in both the server and the worker models, should this be tossed into a module/mixin?
One thing I would say is we probably want to split this into three modules (don't hate me... 😅 ):
MiqWorkerMonitorMemcache::Connection
MiqWorkerMonitorMemcache::Reader
MiqWorkerMonitorMemcache::Writer
Names are a total WIP, but the idea being that they would share the connection stuff, but probably only the worker_monitor
would be writing to Memcache
, while the workers would just be reading (otherwise there is mutex stuff you would have to deal with). You might need to for other messages though, but that depends on how we implement some things.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Is it premature? My hope was that we'd know better about how this would look when we figure out how and if we'd use memcached/redis/etc. to tell a worker (who's caught a SIGINT/SIGTERM and continued finishing the queue work it was doing) to exit. Either way, @NickLaMuro's research into how we let workers gracefully finish their work might drive the interface for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@NickLaMuro's research into how we let workers gracefully finish their work might drive the interface for this.
Most of that work should be handled here: #15818
And that said, I don't know that there are any messages we would have to worry about doing this for after the above, so I would agree that my suggestion is probably premature knowing that.
app/models/miq_worker/runner.rb
Outdated
def get_messages | ||
messages = [] | ||
now = Time.now.utc | ||
@my_last_config_change ||= now |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we simplify these to lines to this?
@my_last_config_change ||= Time.now.utc
That way, we are only calling Time.now.utc
if @my_last_config_change
isn't set, and it doesn't look like the now
variable is being used elsewhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
_log.info("#{log_prefix} Configuration has changed, New TS: #{last_config_change}, Old TS: #{@my_last_config_change}") | ||
messages << ["sync_config"] | ||
|
||
@my_last_config_change = last_config_change |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we change this to this maybe?
@my_last_config_change = Time.now.utc
Reason being, is you do a >
check in the if, and if you just set it to the same time, then it will just continue calling sync_config
every time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call, I'll make that change, thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I think this is ok the way it is. When @my_last_config_change
and last_config_change
are the same it won't call sync_config
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gtanzillo You are right, I had read the if
statement like:
@my_last_config_change < last_config_change
Instead of what you have. Yours works correctly, so lets just stick with that.
@@ -374,6 +375,32 @@ def heartbeat_to_drb | |||
def heartbeat_to_file(timeout = nil) | |||
timeout ||= worker_settings[:heartbeat_timeout] || Workers::MiqDefaults.heartbeat_timeout | |||
File.write(@worker.heartbeat_file, (Time.now.utc + timeout).to_s) | |||
|
|||
get_messages.each { |msg, *args| process_message(msg, *args) } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we see a need going through all of the extra work of looping through an array of messages if there will only ever be one type we will be handling via memcache? We could simplify get_messages
to something like this:
def sync_config_if_needed
@my_last_config_change ||= Time.now.utc
last_config_change = server_last_change(:last_config_change)
if last_config_change && last_config_change > @my_last_config_change
_log.info("#{log_prefix} Configuration has changed, New TS: #{last_config_change}, Old TS: #{@my_last_config_change}")
sync_config
@my_last_config_change = Time.now.utc
end
end
And just change this line to sync_config_if_needed
? No new array needed, and we will just exit early if the if
in the above isn't satisfied.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For this PR, I was just looking to get to functional equivalence with messaging through Drb. We can start evolving this in future PRs as we start removing Drb from the workers.
…m key store with last change timestamp
6a536af
to
829ce96
Compare
Some comments on commits gtanzillo/manageiq@3be2350~...829ce96 spec/models/miq_worker/runner_spec.rb
|
Checked commits gtanzillo/manageiq@3be2350~...829ce96 with ruby 2.2.6, rubocop 0.47.1, and haml-lint 0.20.0 app/models/miq_worker/runner.rb
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me. @NickLaMuro what do you think?
YOLO MERGING. I believe @NickLaMuro told me he was good with the changes yesterday... If not, we can fix in a followup PR. |
@jrafanie I am mostly good with the changes, but I did want to have @gtanzillo address the test using |
context "#get_messages" do | ||
before do | ||
allow_any_instance_of(MiqWorker::Runner).to receive(:worker_initialization) | ||
@worker_base = MiqWorker::Runner.new |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Possibly you could have added this to the top of the spec:
module MiqWorkerForSpecs
module Runner < MiqWorker::Runner
def initialize(cfg ={})
end
end
end
Then these two lines could have changed to this single line:
@worker_base = MiqWorkerForSpecs::Runner.new
get_messages.each { |msg, *args| process_message(msg, *args) } | ||
end | ||
|
||
def get_messages |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change this to fetch_messages
maybe to avoid the rubocop warning.
Left comments for future fixup in a later PR. |
Use memcached and a timestamp as a mechanism for telling workers to synchronize their config and role without going through a Drb connection.
When the worker monitor detects that they have changes it stores the timestamp in memcached in the key that corresponds to the thing that changes. The workers check memcachd each time they heartbeat and if the timestamp is greater than the timestamp of the last sync it will fabricate the message that corresponds to the thing that changed and process it.