Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Polling cluster formation state for master-is-stable health indicator #88397

Merged

Conversation

masseyke
Copy link
Member

@masseyke masseyke commented Jul 8, 2022

As shown in step 1.2.2.4 in the diagram at #85624 (comment), if there has not been a node elected master for more than 30 seconds, then when a user queries a master node it is supposed to reach out to all other master-eligible nodes and summarize the results of their cluster formation knowledge. Since we don't want to block a user request while making those remote calls, it should be done speculatively so that we have the information when it is needed. If a master-eligible node gets a cluster state changed event that the master is null, then it will wait 10 seconds (in case it gets another cluster state changed event that the master has flipped to non-null quickly, which is the most likely scenario), then reach out to all of the other master-eligible nodes using a transport action (#87306) to get their cluster formation data. This data is stashed away in case a user request comes in. Since the user won't hit this path for 30 seconds after a master has become null (because we tell the user they have a stable master unless it's been missing for more than 30 seconds), we have plenty of time for this -- 10 seconds of delay in case we're notified that there is a new master plus 10 seconds to get the result back from the master-eligible node before timing out, plus 10 seconds to spare.
Since it includes nested callbacks, this code is a little hard to follow. Here's the outline:
For each master-eligible node in the cluster, we

  1. Schedule a task to be performed after 10 seconds that connects to the master-eligible node and returns a handle to a Cancellable. This returns immediately because all of the work is done asynchronously.
    1. On an asynchronous successful response from that call to connect, we send the transport request. This also returns immediately because its work is done asynchronously.
      1. On an asynchronous successful response from the transport request, we close the connection update the shared map with the result. Then we start back at step 1 again.
      2. On an asynchronous failure from the transport request, we close the connection and update the shared map with an exception. Then we start back at step 1 again.
    2. On an asynchronous failure from connect, we update the shared map with an exception and then start back at step 1 again.

This PR was originally a part of #88020, but has been broken off to simplify the discussion. This PR only populates the CoordinationDiagnosticsService.clusterFormationResponses field, which will be used by an updated #88020 to drive the master stability logic.

@masseyke masseyke changed the title Polling cluster formation state Polling cluster formation state for master-is-stable health indicator Jul 11, 2022
@masseyke masseyke marked this pull request as ready for review July 11, 2022 21:18
@elasticmachine elasticmachine added the Team:Data Management Meta label for data/management team label Jul 11, 2022
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-data-management (Team:Data Management)

@masseyke masseyke requested review from andreidan and DaveCTurner and removed request for andreidan July 11, 2022 21:18
@elasticsearchmachine
Copy link
Collaborator

Hi @masseyke, I've created a changelog YAML for you.

@masseyke masseyke requested a review from andreidan July 11, 2022 21:19
Copy link
Contributor

@andreidan andreidan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this Keith.

I've left a few suggestions to (hopefully) simplify it a bit

Comment on lines 546 to 552
new ActionListenerResponseHandler<>(
ActionListener.runAfter(
ActionListener.runBefore(clusterFormationInfoResponseListener, () -> Releasables.close(releasable)),
() -> new PollClusterFormationStateTask(node, nodeToClusterFormationStateMap, multipleCancellablesWrapper)
.pollUntilCancelled()
),
ClusterFormationInfoAction.Response::new
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be simplified?

Even if just documenting the order of execution?

ie

close(releasable) -> clusterFormationInfoResponseListener ->  () -> new PollClusterFormationStateTask(node, nodeToClusterFormationStateMap, multipleCancellablesWrapper).pollUntilCancelled()

Hmm, actually is the runAfter needed still?

Could the last step (() -> new PollClusterFormationStateTask(node, nodeToClusterFormationStateMap, multipleCancellablesWrapper).pollUntilCancelled() ) be executed in the clusterFormationInfoResponseListener.whenComplete callback?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could the last step (...) be executed in the clusterFormationInfoResponseListener.whenComplete callback?

It could, but it would have to be done twice (once in the success path and once in the fail path). The nice thing about runAfter and runBefore is that they happen either way (success or failure). I can add in a comment documenting how that works though.

*/
public Scheduler.Cancellable pollUntilCancelled() {
StepListener<Releasable> connectionListener = new StepListener<>();
StepListener<ClusterFormationInfoAction.Response> clusterFormationInfoResponseListener = new StepListener<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this be more readable if named fetchClusterInfoListener ? The Response bit is a bit redundant maybe?

*
* @return
*/
public Scheduler.Cancellable pollUntilCancelled() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me that untilCancelled in the method here is not adding much information? Shall we just call it fetchClusterFormationInfo ? It could also receive the node as parameter?

Whilst here - is there a need for PollClusterFormationStateTask and MultipleCancellablesWrapper to exist?
I think their wrapping of global state makes things a bit more difficult to understand.

Maybe the fetchClusterFormationInfo could also receive a Consumer<ClusterFormationInfoAction.Response> and call that whenever it wants to pass the response to the outside world?
Given it returns a Cancellable the caller could aggregate them all in a list and cancel them without needing a MultipleCancellablesWrapper to aggregate them internally?

the signature I'm proposing is:

Scheduler.Cancellable fetchClusterFormationInfo(DiscoveryNode node, Consumer<ClusterFormationInfoAction.Response> responseConsumer)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh good point about PollClusterFormationStateTask's existence. In an earlier incarnation this was an actual Runnable, and pollUntilCancelled() was just run(). I improved that, but forgot to move the state out of PollClusterFormationStateTask and then get rid of it altogether. I think the signature will have to be a little different though. More on that in an upcoming comment.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's the signature I have now:
void fetchClusterFormationInfo(DiscoveryNode node, ConcurrentMap<DiscoveryNode, ClusterFormationStateOrException> nodeToClusterFormationStateMap, List<Scheduler.Cancellable> cancellables)
I can't just return a single Cancellable because one call to the method can generate lots of Cancellables (since the method schedules calls of itself). So I'm passing in the array of Cancellables that the cancelPollingClusterFormationInfo acts on. This avoids having to have the MultipleCancellablesWrapper but it effectively does the same thing. And I passed in a nodeToClusterFormationStateMap because I can't have just a ClusterFormationInfoAction.Response Consumer because one possible outcome is an Exception rather than a Response. I'll see if making that a Consumer makes it more or less readable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK the method now accepts a Consumer<ClusterFormationStateOrException> instead of a ClusterFormationStateOrException> nodeToClusterFormationStateMap. I think it helps out a little bit. The functionality is unchanged.

Copy link
Contributor

@andreidan andreidan Jul 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a step in the right direction, thanks for implementing it !

I can't just return a single Cancellable because one call to the method can generate lots of Cancellables (since the method schedules calls of itself)

Would this be an avenue for simplification?

Say the method signature is as follows

Scheduler.Cancellable fetchClusterFormationInfo(DiscoveryNode node, Consumer<ClusterFormationInfoAction.Response> responseConsumer)

This means the caller is responsible for re-calling fetchClusterFormationInfo method when the responseConsumer is called.

I think this would remove the need for the runAfter call in that runAfter(runBefore sequence because it seems we'll want to reschedule irrespective of being able to get the response or not - so when responseConsumer is called we should be rescheduling.
ie. we always call responseConsumer irrespective of how fetchClusterInfoListener completes:

                new ActionListenerResponseHandler<>(
                    ActionListener.runBefore(fetchClusterInfoListener, () -> Releasables.close(releasable)),
                    ClusterFormationInfoAction.Response::new
                )

...

fetchClusterInfoListener.whenComplete(response -> {
            long endTime = System.nanoTime();
            logger.trace("Received cluster coordination info from {} in {}", node, TimeValue.timeValueNanos(endTime - startTime));
            responseConsumer.accept(new ClusterFormationStateOrException(response.getClusterFormationState()));
        }, e -> {
            logger.warn("Exception in cluster coordination info request to master node", e);
            responseConsumer.accept(new ClusterFormationStateOrException(e));
        });

Consuming the fetchClusterFormationInfo would look something along the lines of:

        getMasterEligibleNodes().forEach(masterNode -> {
            Consumer<ClusterFormationStateOrException> responseConsumer = response -> {
                if (clusterFormationResponses != null) {
                    clusterFormationResponses.put(masterNode, response);
                }
            };
            
            Scheduler.ScheduledCancellable scheduleFetch = fetchClusterFormationInfo(masterNode, responseConsumer.andThen(response -> {
                if (clusterFormationInfoTasks != null) {
                    // reschedule the fetch if it wasn't cancelled already
                    clusterFormationInfoTasks.add(fetchClusterFormationInfo(masterNode, responseConsumer));
                }
            }));
            
            clusterFormationInfoTasks.add(scheduleFetch);
        });

This would reduce the scope of fetchClusterFormationInfo and the need for a Consumer<Cancellable>.

What do you think?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless I'm misunderstanding something, we can't use a Consumer<ClusterFormationInfoAction.Response> here because then we'd have no way of handling exceptions, right? And we need to keep track of exceptions we encounter.
And even using a Consumer<ClusterFormationStateOrException> I don't think we can take the approach you're suggesting will work as-is because it misses retries on a connectToNode exception (https://github.com/elastic/elasticsearch/pull/88397/files#diff-6898ba5666bbb351e267e9295e8feb7fbabf0d95d77aa4375c06cb2c16ad0a7dR529). I could probably add in a separate consumer for connectToNode, but at that point it's starting to get even more complex. Is there any use case that is missed by the current code?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I think your proposal exposes us to the race condition here right (although I'm not 100% sure that causes big problems)? #88397 (comment)
But that is fixable.

Copy link
Contributor

@andreidan andreidan Jul 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And even using a Consumer I don't think we can take the approach you're suggesting will work as-is because it misses retries on a connectToNode exception (https://github.com/elastic/elasticsearch/pull/88397/files#diff-6898ba5666bbb351e267e9295e8feb7fbabf0d95d77aa4375c06cb2c16ad0a7dR529)

Ah apologies I did mean Consumer<ClusterFormationStateOrException>. But if the connectToNode raises an exception the response consumer will still be called https://github.com/elastic/elasticsearch/pull/88397/files#diff-6898ba5666bbb351e267e9295e8feb7fbabf0d95d77aa4375c06cb2c16ad0a7dR524 (and the responseConsumer will track the exception and reschedule the fetch). Am I misisng something?

Is there any use case that is missed by the current code?

I think it's a bit difficult to follow, that's why I suggested reducing the scope of fetch... and have a clear view over which callback reschedules the polling and when that happens. I'd argue that if the suggestion I proposed above works (I might be missing something) it'll be clear from the method signature that fetch... schedules one Cancellable task and has one consumer that gets a success or an exception (which could be chained to reschedule the fetch via andThen...)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I was mistaken about where it was failing. As we discussed offline, the actual reason it was failing was because the code at #88397 (comment) would stop polling after 2 attempts. We're now doing something very similar to that, but recursive so that it continues polling until cancelled.
We also discussed offline that I think the code above could throw NullPointerExceptions since clusterFormationInfoTasks and clusterFormationResponses could become null in between the null check and using them. Rather than synchronizing that code, I've put both of those references on the stack so that they're never null.

Comment on lines 491 to 493
* This is a reference to the global nodeToClusterFormationStateMap that was current at the time this object was constructed. The
* global map is recreated whenever the task is cancelled. Having this reference prevents accidental writes to that map after
* cancellation.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit confusing as it implies we're copying it (ie "that was current at the time this object was constructed"), but also "references" it ... which means it's going to change in line with the global map.

I'm also not sure about the accidental writes to "that" map (it's "this" map isn't it :) ?) nor what's accidental about the said writes.

Could we maybe talk about the why we have this map here as opposed to the implementation details of how it arrived here? (apologies if I'm misunderstanding its purpose)

Update: maybe we should drop PollClusterFormationStateTask altogether, in which case ignore the above :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Below is a scenario showing why I'm copying the clusterFormationResponses reference to and passing it in rather than using the global clusterFormationResponses. Maybe I'm being unnecessarily cautious.

We begin polling master eligible nodes A, B, C
We create a new global clusterFormationResponses map
We update the global clusterFormationResponses with results from A, B
We cancel polling
We begin polling master eligible nodes A, B, D (note that it's a different set)
We create a new global clusterFormationResponses map
We update the global clusterFormationResponses with results from C (the write was happening before cancel took effect and I don't think that it will interrupt a map insert but maybe I'm wrong)
We update the global clusterFormationResponses with results from A, B, D

So now we have result for A, B, C, and D, even though C is no longer part of the cluster. So we get weird results when we look to see if we can form a quorum, or if C knows about the existence of D. Or even if it's not as extreme as a node being replaced, we have results that might be out of date, causing confusion. My simple way to avoid worrying about all of this was to just pass the method a reference to the clusterFormationResponses map that it is supposed to be using.

*/
beginPollingClusterFormationInfo();
} else {
cancelPollingClusterFormationInfo();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO we shouldn't be constantly calling this cancel... when the cluster is healthy, but only when re-gaining a master.
@DaveCTurner what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a huge deal. Maybe we could use null instead of new ConcurrentHashMap<>() to distinguish "we're not polling" from "we're polling but have no entries"?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thinking was that it's basically a no-op (there won't be any scheduled tasks most of the time), and the miniscule performance hit was worth the risk of complicating the code and/or accidentally not calling it when it needed to be called.
I can have cancelPollingClusterFormationInfo set clusterFormationResponses to null and have beginPollingClusterFormationInfo create a new ConcurrentHashMap<>() (I think that's what you're suggesting?) -- that would save a little garbage collection.

@masseyke masseyke requested a review from andreidan July 12, 2022 22:31
@masseyke
Copy link
Member Author

@elasticmachine update branch

@masseyke
Copy link
Member Author

@elasticmachine update branch

@elasticmachine
Copy link
Collaborator

expected head sha didn’t match current head ref.

Copy link
Contributor

@andreidan andreidan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for iterating on this Keith

LGTM

* completed, adding the resulting Cancellable to cancellableConsumer.
* @param masterEligibleNode The node being polled
* @param responseConsumer The response consumer to be wrapped
* @param cancellableConsumer The list of Cancellables
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not accurate anymore

}

/**
* This wraps the responseConsumer in a Consumer that will run beginPollingClusterFormationInfo() after responseConsumer has
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will run fetchClusterFormationInfo I believe?

/*
* This test sets up a 4-node cluster (3 master eligible). We call beginPollingClusterFormationInfo() on each node. We then
* cancel all tasks. This simulates what will happen most often in practice -- polling is triggered when the master node goes
* null, and then polling is cancelled when a new master node is elected within 10 seconds. We then simulate the cluster running
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"polling is cancelled when a new master node is elected within 10 seconds"

I think polling is cancelled immediately after a master node is detected. Maybe I'm misunderstanding what you meant here by the 10 seconds delay?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll update the wording -- it was meant to read as "polling in cancelled immediately when a new master node is elected, which is usually well within 10 seconds of when we were alerted that the master node went null" or something like that.

@masseyke
Copy link
Member Author

@elasticmachine update branch

@masseyke masseyke merged commit 3c5bf7e into elastic:master Jul 14, 2022
@masseyke masseyke deleted the feature/polling-cluster-formation-state branch July 14, 2022 15:58
weizijun added a commit to weizijun/elasticsearch that referenced this pull request Jul 15, 2022
* upstream/master: (2974 commits)
  Reserved cluster state service (elastic#88527)
  Add transport action immutable state checks (elastic#88491)
  Remove suggest flag from index stats docs (elastic#85479)
  Polling cluster formation state for master-is-stable health indicator (elastic#88397)
  Add test execution guide in yamlRestTest asciidoc (elastic#88490)
  Add troubleshooting guide for corrupt repository (elastic#88391)
  [Transform] Finetune Schedule to be less noisy on retry and retry slower (elastic#88531)
  Updatable API keys - auto-update legacy RDs (elastic#88514)
  Fix typo in TransportForceMergeAction and TransportClearIndicesCacheA… (elastic#88064)
  Fixed NullPointerException on bulk request (elastic#88358)
  Avoid needless index metadata builders during reroute (elastic#88506)
  Set metadata on request in API key noop test (elastic#88507)
  Fix passing positional args to ES in Docker (elastic#88502)
  Improve description for task api detailed param (elastic#88493)
  Support cartesian shape with doc values (elastic#88487)
  Promote usage of Subjects in Authentication class (elastic#88494)
  Add CCx 2.0 feature flag (elastic#88451)
  Reword the watcher 'always' and 'never' condition docs (elastic#86105)
  Simplify azure discovery installation docs (elastic#88404)
  Breakup FIPS CI testing jobs
  ...

# Conflicts:
#	server/src/main/java/org/elasticsearch/index/mapper/NumberFieldMapper.java
#	x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/mapper/AggregateDoubleMetricFieldMapper.java
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants