-
Notifications
You must be signed in to change notification settings - Fork 25.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Remove I/O pool blocking sniffing call from onFailure callback, add some logic around host exclusion #27985
Remove I/O pool blocking sniffing call from onFailure callback, add some logic around host exclusion #27985
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,8 +29,11 @@ | |
import java.io.IOException; | ||
import java.security.AccessController; | ||
import java.security.PrivilegedAction; | ||
import java.util.ArrayList; | ||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.concurrent.Executors; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.ScheduledExecutorService; | ||
import java.util.concurrent.ScheduledFuture; | ||
import java.util.concurrent.ThreadFactory; | ||
|
@@ -53,15 +56,16 @@ public class Sniffer implements Closeable { | |
|
||
private final Task task; | ||
|
||
Sniffer(RestClient restClient, HostsSniffer hostsSniffer, long sniffInterval, long sniffAfterFailureDelay) { | ||
this.task = new Task(hostsSniffer, restClient, sniffInterval, sniffAfterFailureDelay); | ||
Sniffer(RestClient restClient, HostsSniffer hostsSniffer, long sniffInterval, long sniffAfterFailureDelay, int maxExcludedRounds) { | ||
this.task = new Task(hostsSniffer, restClient, sniffInterval, sniffAfterFailureDelay, maxExcludedRounds); | ||
} | ||
|
||
/** | ||
* Triggers a new sniffing round and explicitly takes out the failed host provided as argument | ||
*/ | ||
public void sniffOnFailure(HttpHost failedHost) { | ||
this.task.sniffOnFailure(failedHost); | ||
this.task.failedHosts.putIfAbsent(failedHost, 0L); | ||
this.task.scheduleNextRun(0); | ||
} | ||
|
||
@Override | ||
|
@@ -75,15 +79,24 @@ private static class Task implements Runnable { | |
|
||
private final long sniffIntervalMillis; | ||
private final long sniffAfterFailureDelayMillis; | ||
private final int maxExcludedRounds; | ||
private final ScheduledExecutorService scheduledExecutorService; | ||
private final AtomicBoolean running = new AtomicBoolean(false); | ||
private ScheduledFuture<?> scheduledFuture; | ||
private ConcurrentHashMap<HttpHost, Long> failedHosts = new ConcurrentHashMap<>(); | ||
|
||
private Task( | ||
HostsSniffer hostsSniffer, | ||
RestClient restClient, | ||
long sniffIntervalMillis, | ||
long sniffAfterFailureDelayMillis, | ||
int maxExcludedRounds) { | ||
|
||
private Task(HostsSniffer hostsSniffer, RestClient restClient, long sniffIntervalMillis, long sniffAfterFailureDelayMillis) { | ||
this.hostsSniffer = hostsSniffer; | ||
this.restClient = restClient; | ||
this.sniffIntervalMillis = sniffIntervalMillis; | ||
this.sniffAfterFailureDelayMillis = sniffAfterFailureDelayMillis; | ||
this.maxExcludedRounds = maxExcludedRounds; | ||
SnifferThreadFactory threadFactory = new SnifferThreadFactory(SNIFFER_THREAD_NAME); | ||
this.scheduledExecutorService = Executors.newScheduledThreadPool(1, threadFactory); | ||
scheduleNextRun(0); | ||
|
@@ -106,35 +119,63 @@ synchronized void scheduleNextRun(long delayMillis) { | |
|
||
@Override | ||
public void run() { | ||
sniff(null, sniffIntervalMillis); | ||
} | ||
|
||
void sniffOnFailure(HttpHost failedHost) { | ||
sniff(failedHost, sniffAfterFailureDelayMillis); | ||
sniff(sniffIntervalMillis); | ||
} | ||
|
||
void sniff(HttpHost excludeHost, long nextSniffDelayMillis) { | ||
void sniff(long nextSniffDelayMillis) { | ||
if (running.compareAndSet(false, true)) { | ||
long nextSniffDelay = nextSniffDelayMillis; | ||
try { | ||
List<HttpHost> sniffedHosts = hostsSniffer.sniffHosts(); | ||
logger.debug("sniffed hosts: " + sniffedHosts); | ||
if (excludeHost != null) { | ||
sniffedHosts.remove(excludeHost); | ||
} | ||
if (sniffedHosts.isEmpty()) { | ||
|
||
List<HttpHost> hostsFiltered = removeExcludedAndCycle(sniffedHosts); | ||
logger.debug("sniffed hosts after filtering: " + sniffedHosts); | ||
|
||
if (hostsFiltered.isEmpty()) { | ||
logger.warn("no hosts to set, hosts will be updated at the next sniffing round"); | ||
} else { | ||
this.restClient.setHosts(sniffedHosts.toArray(new HttpHost[sniffedHosts.size()])); | ||
this.restClient.setHosts(hostsFiltered.toArray(new HttpHost[hostsFiltered.size()])); | ||
} | ||
} catch (Exception e) { | ||
logger.error("error while sniffing nodes", e); | ||
nextSniffDelay = sniffAfterFailureDelayMillis; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The idea behind |
||
} finally { | ||
scheduleNextRun(nextSniffDelayMillis); | ||
scheduleNextRun(nextSniffDelay); | ||
running.set(false); | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Remove excluded hosts from the list of all sniffed hosts, and cycle through the map. Hosts in the map remain | ||
* there for {@link org.elasticsearch.client.sniff.Sniffer.Task#maxExcludedRounds} cycles | ||
* @param allHosts the list of all sniffed hosts | ||
* @return a new list containing the remaining hosts | ||
*/ | ||
private List<HttpHost> removeExcludedAndCycle(List<HttpHost> allHosts) { | ||
final List<HttpHost> excluded = Collections.list(failedHosts.keys()); | ||
|
||
if (excluded.isEmpty()) { | ||
return allHosts; | ||
} | ||
|
||
try { | ||
List<HttpHost> copy = new ArrayList<>(allHosts); | ||
copy.removeAll(excluded); | ||
return copy; | ||
} finally { | ||
for (HttpHost host : excluded) { | ||
long excludedCycles = failedHosts.get(host) + 1; | ||
if (excludedCycles >= maxExcludedRounds) { | ||
failedHosts.remove(host); | ||
} else { | ||
failedHosts.put(host, excludedCycles); | ||
} | ||
} | ||
} | ||
} | ||
|
||
synchronized void shutdown() { | ||
scheduledExecutorService.shutdown(); | ||
try { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suspect that having to pass the next interval which differs from the usual interval is the reason why this was not async in the first place :) we need to find a way to do that though, we currently lose that behaviour with this change.
Another idea, maybe overkill, could be to change the hosts sniffer API to be async and accept a listener as an argument.