diff --git a/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java b/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java index 8d78f9c838e38..4b8936198911e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java +++ b/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java @@ -131,13 +131,13 @@ public void onMaster() { logger.trace("I have been elected master, scheduling a ClusterInfoUpdateJob"); } - // Submit a job that will start after DEFAULT_STARTING_INTERVAL, and reschedule itself after running + // Submit a job that will reschedule itself after running threadPool.scheduleUnlessShuttingDown(updateFrequency, executorName(), new SubmitReschedulingClusterInfoUpdatedJob()); try { if (clusterService.state().getNodes().getDataNodes().size() > 1) { // Submit an info update job to be run immediately - threadPool.executor(executorName()).execute(() -> maybeRefresh()); + threadPool.executor(executorName()).execute(this::maybeRefresh); } } catch (EsRejectedExecutionException ex) { logger.debug("Couldn't schedule cluster info update task - node might be shutting down", ex); @@ -173,7 +173,7 @@ public void clusterChanged(ClusterChangedEvent event) { if (logger.isDebugEnabled()) { logger.debug("data node was added, retrieving new cluster info"); } - threadPool.executor(executorName()).execute(() -> maybeRefresh()); + threadPool.executor(executorName()).execute(this::maybeRefresh); } if (this.isMaster && event.nodesRemoved()) { @@ -316,7 +316,7 @@ public void onResponse(IndicesStatsResponse indicesStatsResponse) { ShardStats[] stats = indicesStatsResponse.getShards(); ImmutableOpenMap.Builder newShardSizes = ImmutableOpenMap.builder(); ImmutableOpenMap.Builder newShardRoutingToDataPath = ImmutableOpenMap.builder(); - buildShardLevelInfo(logger, stats, newShardSizes, newShardRoutingToDataPath, clusterService.state()); + buildShardLevelInfo(logger, stats, newShardSizes, newShardRoutingToDataPath); shardSizes = newShardSizes.build(); shardRoutingToDataPath = newShardRoutingToDataPath.build(); } @@ -365,7 +365,7 @@ public void onFailure(Exception e) { } static void buildShardLevelInfo(Logger logger, ShardStats[] stats, ImmutableOpenMap.Builder newShardSizes, - ImmutableOpenMap.Builder newShardRoutingToDataPath, ClusterState state) { + ImmutableOpenMap.Builder newShardRoutingToDataPath) { for (ShardStats s : stats) { newShardRoutingToDataPath.put(s.getShardRouting(), s.getDataPath()); long size = s.getStats().getStore().sizeInBytes(); diff --git a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index 2e460f0bf9717..67023f00f22db 100644 --- a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -379,7 +379,9 @@ public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) { if (logger.isTraceEnabled()) { logger.trace("{}, scheduling a reroute", reason); } - routingService.reroute(reason); + routingService.reroute(reason, ActionListener.wrap( + r -> logger.trace("{}, reroute completed", reason), + e -> logger.debug(new ParameterizedMessage("{}, reroute failed", reason), e))); } } } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index ce805f8b84a39..4227006d68f1f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -80,7 +80,6 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; -import java.util.function.Consumer; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -146,13 +145,14 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery * @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}. * @param onJoinValidators A collection of join validators to restrict which nodes may join the cluster. * @param reroute A callback to call when the membership of the cluster has changed, to recalculate the assignment of shards. In - * production code this calls {@link org.elasticsearch.cluster.routing.RoutingService#reroute(String)}. + * production code this calls + * {@link org.elasticsearch.cluster.routing.RoutingService#reroute(String, ActionListener)}. */ public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSettings, TransportService transportService, NamedWriteableRegistry namedWriteableRegistry, AllocationService allocationService, MasterService masterService, Supplier persistedStateSupplier, SeedHostsProvider seedHostsProvider, ClusterApplier clusterApplier, Collection> onJoinValidators, Random random, - Consumer reroute, ElectionStrategy electionStrategy) { + BiConsumer> reroute, ElectionStrategy electionStrategy) { this.settings = settings; this.transportService = transportService; this.masterService = masterService; diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java index 7b16e9a096d5e..0f0c6e6e6b883 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java @@ -62,7 +62,6 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; -import java.util.function.Consumer; import java.util.function.Function; import java.util.function.LongSupplier; import java.util.function.Supplier; @@ -92,7 +91,7 @@ public class JoinHelper { JoinHelper(Settings settings, AllocationService allocationService, MasterService masterService, TransportService transportService, LongSupplier currentTermSupplier, Supplier currentStateSupplier, BiConsumer joinHandler, Function joinLeaderInTerm, - Collection> joinValidators, Consumer reroute) { + Collection> joinValidators, BiConsumer> reroute) { this.masterService = masterService; this.transportService = transportService; this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings); diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java index d439cff2c7ac6..ab0eeba6bcd1e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java @@ -20,6 +20,7 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateTaskExecutor; import org.elasticsearch.cluster.NotMasterException; @@ -36,7 +37,6 @@ import java.util.Collections; import java.util.List; import java.util.function.BiConsumer; -import java.util.function.Consumer; import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; @@ -45,7 +45,7 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor reroute; + private final BiConsumer> reroute; public static class Task { @@ -82,7 +82,7 @@ public boolean isFinishElectionTask() { private static final String FINISH_ELECTION_TASK_REASON = "_FINISH_ELECTION_"; } - public JoinTaskExecutor(AllocationService allocationService, Logger logger, Consumer reroute) { + public JoinTaskExecutor(AllocationService allocationService, Logger logger, BiConsumer> reroute) { this.allocationService = allocationService; this.logger = logger; this.reroute = reroute; @@ -149,7 +149,10 @@ public ClusterTasksResult execute(ClusterState currentState, List jo results.success(joinTask); } if (nodesChanged) { - reroute.accept("post-join reroute"); + reroute.accept("post-join reroute", ActionListener.wrap( + r -> logger.trace("post-join reroute completed"), + e -> logger.debug("post-join reroute failed", e))); + return results.build(allocationService.adaptAutoExpandReplicas(newState.nodes(nodesBuilder).build())); } else { // we must return a new cluster state instance to force publishing. This is important diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java index 89e19e02b30ed..7068f9079050f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java @@ -22,16 +22,20 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.PlainListenableActionFuture; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; -import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Priority; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiFunction; /** * A {@link RoutingService} listens to clusters state. When this service @@ -51,14 +55,16 @@ public class RoutingService extends AbstractLifecycleComponent { private static final String CLUSTER_UPDATE_TASK_SOURCE = "cluster_reroute"; private final ClusterService clusterService; - private final AllocationService allocationService; + private final BiFunction reroute; - private AtomicBoolean rerouting = new AtomicBoolean(); + private final Object mutex = new Object(); + @Nullable // null if no reroute is currently pending + private PlainListenableActionFuture pendingRerouteListeners; @Inject - public RoutingService(ClusterService clusterService, AllocationService allocationService) { + public RoutingService(ClusterService clusterService, BiFunction reroute) { this.clusterService = clusterService; - this.allocationService = allocationService; + this.reroute = reroute; } @Override @@ -76,34 +82,55 @@ protected void doClose() { /** * Initiates a reroute. */ - public final void reroute(String reason) { - try { - if (lifecycle.stopped()) { - return; - } - if (rerouting.compareAndSet(false, true) == false) { - logger.trace("already has pending reroute, ignoring {}", reason); + public final void reroute(String reason, ActionListener listener) { + if (lifecycle.started() == false) { + listener.onFailure(new IllegalStateException( + "rejecting delayed reroute [" + reason + "] in state [" + lifecycleState() + "]")); + return; + } + final PlainListenableActionFuture currentListeners; + synchronized (mutex) { + if (pendingRerouteListeners != null) { + logger.trace("already has pending reroute, adding [{}] to batch", reason); + pendingRerouteListeners.addListener(listener); return; } - logger.trace("rerouting {}", reason); + currentListeners = PlainListenableActionFuture.newListenableFuture(); + currentListeners.addListener(listener); + pendingRerouteListeners = currentListeners; + } + logger.trace("rerouting [{}]", reason); + try { clusterService.submitStateUpdateTask(CLUSTER_UPDATE_TASK_SOURCE + "(" + reason + ")", new ClusterStateUpdateTask(Priority.HIGH) { @Override public ClusterState execute(ClusterState currentState) { - rerouting.set(false); - return allocationService.reroute(currentState, reason); + synchronized (mutex) { + assert pendingRerouteListeners == currentListeners; + pendingRerouteListeners = null; + } + return reroute.apply(currentState, reason); } @Override public void onNoLongerMaster(String source) { - rerouting.set(false); - // no biggie + synchronized (mutex) { + if (pendingRerouteListeners == currentListeners) { + pendingRerouteListeners = null; + } + } + currentListeners.onFailure(new NotMasterException("delayed reroute [" + reason + "] cancelled")); + // no big deal, the new master will reroute again } @Override public void onFailure(String source, Exception e) { - rerouting.set(false); - ClusterState state = clusterService.state(); + synchronized (mutex) { + if (pendingRerouteListeners == currentListeners) { + pendingRerouteListeners = null; + } + } + final ClusterState state = clusterService.state(); if (logger.isTraceEnabled()) { logger.error(() -> new ParameterizedMessage("unexpected failure during [{}], current state:\n{}", source, state), e); @@ -111,12 +138,22 @@ public void onFailure(String source, Exception e) { logger.error(() -> new ParameterizedMessage("unexpected failure during [{}], current state version [{}]", source, state.version()), e); } + currentListeners.onFailure(new ElasticsearchException("delayed reroute [" + reason + "] failed", e)); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + currentListeners.onResponse(null); } }); } catch (Exception e) { - rerouting.set(false); + synchronized (mutex) { + assert pendingRerouteListeners == currentListeners; + pendingRerouteListeners = null; + } ClusterState state = clusterService.state(); logger.warn(() -> new ParameterizedMessage("failed to reroute routing table, current state:\n{}", state), e); + currentListeners.onFailure(new ElasticsearchException("delayed reroute [" + reason + "] could not be submitted", e)); } } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java index 4badab5a0cafa..96e4974b9b4a8 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java @@ -21,12 +21,20 @@ import java.util.HashSet; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.LongSupplier; import java.util.function.Supplier; import com.carrotsearch.hppc.ObjectLookupContainer; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.lucene.util.SetOnce; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.GroupedActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterInfo; import org.elasticsearch.cluster.ClusterState; @@ -54,11 +62,15 @@ public class DiskThresholdMonitor { private final Client client; private final Set nodeHasPassedWatermark = Sets.newConcurrentHashSet(); private final Supplier clusterStateSupplier; - private long lastRunNS; + private final LongSupplier currentTimeMillisSupplier; + private final AtomicLong lastRunTimeMillis = new AtomicLong(Long.MIN_VALUE); + private final AtomicBoolean checkInProgress = new AtomicBoolean(); + private final SetOnce>> rerouteAction = new SetOnce<>(); public DiskThresholdMonitor(Settings settings, Supplier clusterStateSupplier, ClusterSettings clusterSettings, - Client client) { + Client client, LongSupplier currentTimeMillisSupplier) { this.clusterStateSupplier = clusterStateSupplier; + this.currentTimeMillisSupplier = currentTimeMillisSupplier; this.diskThresholdSettings = new DiskThresholdSettings(settings, clusterSettings); this.client = client; } @@ -92,88 +104,129 @@ private void warnAboutDiskIfNeeded(DiskUsage usage) { } } + private void checkFinished() { + final boolean checkFinished = checkInProgress.compareAndSet(true, false); + assert checkFinished; + } public void onNewInfo(ClusterInfo info) { - ImmutableOpenMap usages = info.getNodeLeastAvailableDiskUsages(); - if (usages != null) { - boolean reroute = false; - String explanation = ""; - - // Garbage collect nodes that have been removed from the cluster - // from the map that tracks watermark crossing - ObjectLookupContainer nodes = usages.keys(); - for (String node : nodeHasPassedWatermark) { - if (nodes.contains(node) == false) { - nodeHasPassedWatermark.remove(node); - } + + assert rerouteAction.get() != null; + + if (checkInProgress.compareAndSet(false, true) == false) { + logger.info("skipping monitor as a check is already in progress"); + return; + } + + final ImmutableOpenMap usages = info.getNodeLeastAvailableDiskUsages(); + if (usages == null) { + checkFinished(); + return; + } + + boolean reroute = false; + String explanation = ""; + final long currentTimeMillis = currentTimeMillisSupplier.getAsLong(); + + // Garbage collect nodes that have been removed from the cluster + // from the map that tracks watermark crossing + final ObjectLookupContainer nodes = usages.keys(); + for (String node : nodeHasPassedWatermark) { + if (nodes.contains(node) == false) { + nodeHasPassedWatermark.remove(node); } - ClusterState state = clusterStateSupplier.get(); - Set indicesToMarkReadOnly = new HashSet<>(); - for (ObjectObjectCursor entry : usages) { - String node = entry.key; - DiskUsage usage = entry.value; - warnAboutDiskIfNeeded(usage); - if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdFloodStage().getBytes() || - usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdFloodStage()) { - RoutingNode routingNode = state.getRoutingNodes().node(node); - if (routingNode != null) { // this might happen if we haven't got the full cluster-state yet?! - for (ShardRouting routing : routingNode) { - indicesToMarkReadOnly.add(routing.index().getName()); - } + } + final ClusterState state = clusterStateSupplier.get(); + final Set indicesToMarkReadOnly = new HashSet<>(); + + for (final ObjectObjectCursor entry : usages) { + final String node = entry.key; + final DiskUsage usage = entry.value; + warnAboutDiskIfNeeded(usage); + if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdFloodStage().getBytes() || + usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdFloodStage()) { + final RoutingNode routingNode = state.getRoutingNodes().node(node); + if (routingNode != null) { // this might happen if we haven't got the full cluster-state yet?! + for (ShardRouting routing : routingNode) { + indicesToMarkReadOnly.add(routing.index().getName()); } - } else if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes() || - usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh()) { - if ((System.nanoTime() - lastRunNS) > diskThresholdSettings.getRerouteInterval().nanos()) { - lastRunNS = System.nanoTime(); + } + } else if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes() || + usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh()) { + if (lastRunTimeMillis.get() < currentTimeMillis - diskThresholdSettings.getRerouteInterval().millis()) { + reroute = true; + explanation = "high disk watermark exceeded on one or more nodes"; + } else { + logger.debug("high disk watermark exceeded on {} but an automatic reroute has occurred " + + "in the last [{}], skipping reroute", + node, diskThresholdSettings.getRerouteInterval()); + } + nodeHasPassedWatermark.add(node); + } else if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdLow().getBytes() || + usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdLow()) { + nodeHasPassedWatermark.add(node); + } else { + if (nodeHasPassedWatermark.contains(node)) { + // The node has previously been over the high or + // low watermark, but is no longer, so we should + // reroute so any unassigned shards can be allocated + // if they are able to be + if (lastRunTimeMillis.get() < currentTimeMillis - diskThresholdSettings.getRerouteInterval().millis()) { reroute = true; - explanation = "high disk watermark exceeded on one or more nodes"; + explanation = "one or more nodes has gone under the high or low watermark"; + nodeHasPassedWatermark.remove(node); } else { - logger.debug("high disk watermark exceeded on {} but an automatic reroute has occurred " + + logger.debug("{} has gone below a disk threshold, but an automatic reroute has occurred " + "in the last [{}], skipping reroute", node, diskThresholdSettings.getRerouteInterval()); } - nodeHasPassedWatermark.add(node); - } else if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdLow().getBytes() || - usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdLow()) { - nodeHasPassedWatermark.add(node); - } else { - if (nodeHasPassedWatermark.contains(node)) { - // The node has previously been over the high or - // low watermark, but is no longer, so we should - // reroute so any unassigned shards can be allocated - // if they are able to be - if ((System.nanoTime() - lastRunNS) > diskThresholdSettings.getRerouteInterval().nanos()) { - lastRunNS = System.nanoTime(); - reroute = true; - explanation = "one or more nodes has gone under the high or low watermark"; - nodeHasPassedWatermark.remove(node); - } else { - logger.debug("{} has gone below a disk threshold, but an automatic reroute has occurred " + - "in the last [{}], skipping reroute", - node, diskThresholdSettings.getRerouteInterval()); - } - } } } - if (reroute) { - logger.info("rerouting shards: [{}]", explanation); - reroute(); - } - indicesToMarkReadOnly.removeIf(index -> state.getBlocks().indexBlocked(ClusterBlockLevel.WRITE, index)); - if (indicesToMarkReadOnly.isEmpty() == false) { - markIndicesReadOnly(indicesToMarkReadOnly); - } } + + final ActionListener listener = new GroupedActionListener<>(ActionListener.wrap(this::checkFinished), 2); + + if (reroute) { + logger.info("rerouting shards: [{}]", explanation); + rerouteAction.get().accept(ActionListener.wrap(r -> { + setLastRunTimeMillis(); + listener.onResponse(r); + }, e -> { + logger.debug("reroute failed", e); + setLastRunTimeMillis(); + listener.onFailure(e); + })); + } else { + listener.onResponse(null); + } + + indicesToMarkReadOnly.removeIf(index -> state.getBlocks().indexBlocked(ClusterBlockLevel.WRITE, index)); + if (indicesToMarkReadOnly.isEmpty() == false) { + markIndicesReadOnly(indicesToMarkReadOnly, ActionListener.wrap(r -> { + setLastRunTimeMillis(); + listener.onResponse(r); + }, e -> { + logger.debug("marking indices readonly failed", e); + setLastRunTimeMillis(); + listener.onFailure(e); + })); + } else { + listener.onResponse(null); + } + } + + private void setLastRunTimeMillis() { + lastRunTimeMillis.getAndUpdate(l -> Math.max(l, currentTimeMillisSupplier.getAsLong())); } - protected void markIndicesReadOnly(Set indicesToMarkReadOnly) { + protected void markIndicesReadOnly(Set indicesToMarkReadOnly, ActionListener listener) { // set read-only block but don't block on the response - client.admin().indices().prepareUpdateSettings(indicesToMarkReadOnly.toArray(Strings.EMPTY_ARRAY)). - setSettings(Settings.builder().put(IndexMetaData.SETTING_READ_ONLY_ALLOW_DELETE, true).build()).execute(); + client.admin().indices().prepareUpdateSettings(indicesToMarkReadOnly.toArray(Strings.EMPTY_ARRAY)) + .setSettings(Settings.builder().put(IndexMetaData.SETTING_READ_ONLY_ALLOW_DELETE, true).build()) + .execute(ActionListener.map(listener, r -> null)); } - protected void reroute() { - // Execute an empty reroute, but don't block on the response - client.admin().cluster().prepareReroute().execute(); + public void setRerouteAction(BiConsumer> rerouteAction) { + this.rerouteAction.set(listener -> rerouteAction.accept("disk threshold monitor", listener)); } } diff --git a/server/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java b/server/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java index 82627cfdc0b82..6543d5d1174e2 100644 --- a/server/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java +++ b/server/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java @@ -21,6 +21,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.nodes.BaseNodeResponse; import org.elasticsearch.action.support.nodes.BaseNodesResponse; import org.elasticsearch.cluster.routing.RoutingNodes; @@ -137,7 +139,9 @@ class InternalAsyncFetch extends AsyncShardFetch @Override protected void reroute(ShardId shardId, String reason) { logger.trace("{} scheduling reroute for {}", shardId, reason); - routingService.reroute("async_shard_fetch"); + routingService.reroute("async_shard_fetch", ActionListener.wrap( + r -> logger.trace("{} scheduled reroute completed for {}", shardId, reason), + e -> logger.debug(new ParameterizedMessage("{} scheduled reroute failed for {}", shardId, reason), e))); } } diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index 3fab82c3e9eb9..f9c083ad5b0ad 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -365,10 +365,10 @@ protected Node( resourcesToClose.add(clusterService); final IngestService ingestService = new IngestService(clusterService, threadPool, this.environment, scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), pluginsService.filterPlugins(IngestPlugin.class)); - final DiskThresholdMonitor listener = new DiskThresholdMonitor(settings, clusterService::state, - clusterService.getClusterSettings(), client); + final DiskThresholdMonitor diskThresholdMonitor = new DiskThresholdMonitor(settings, clusterService::state, + clusterService.getClusterSettings(), client, threadPool::relativeTimeInMillis); final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client, - listener::onNewInfo); + diskThresholdMonitor::onNewInfo); final UsageService usageService = new UsageService(); ModulesBuilder modules = new ModulesBuilder(); @@ -499,7 +499,7 @@ protected Node( RestoreService restoreService = new RestoreService(clusterService, repositoryService, clusterModule.getAllocationService(), metaDataCreateIndexService, metaDataIndexUpgradeService, clusterService.getClusterSettings()); - final RoutingService routingService = new RoutingService(clusterService, clusterModule.getAllocationService()); + final RoutingService routingService = new RoutingService(clusterService, clusterModule.getAllocationService()::reroute); final DiscoveryModule discoveryModule = new DiscoveryModule(settings, threadPool, transportService, namedWriteableRegistry, networkService, clusterService.getMasterService(), clusterService.getClusterApplierService(), clusterService.getClusterSettings(), pluginsService.filterPlugins(DiscoveryPlugin.class), @@ -508,6 +508,7 @@ protected Node( transportService, indicesService, pluginsService, circuitBreakerService, scriptModule.getScriptService(), httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter(), responseCollectorService, searchTransportService); + diskThresholdMonitor.setRerouteAction(routingService::reroute); final SearchService searchService = newSearchService(clusterService, indicesService, threadPool, scriptModule.getScriptService(), bigArrays, searchModule.getFetchPhase(), diff --git a/server/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java b/server/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java index fcccaf6c0f021..55eae6fc0e95f 100644 --- a/server/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java @@ -119,7 +119,7 @@ public void testFillShardLevelInfo() { ImmutableOpenMap.Builder shardSizes = ImmutableOpenMap.builder(); ImmutableOpenMap.Builder routingToPath = ImmutableOpenMap.builder(); ClusterState state = ClusterState.builder(new ClusterName("blarg")).version(0).build(); - InternalClusterInfoService.buildShardLevelInfo(logger, stats, shardSizes, routingToPath, state); + InternalClusterInfoService.buildShardLevelInfo(logger, stats, shardSizes, routingToPath); assertEquals(2, shardSizes.size()); assertTrue(shardSizes.containsKey(ClusterInfo.shardIdentifierFromRouting(test_0))); assertTrue(shardSizes.containsKey(ClusterInfo.shardIdentifierFromRouting(test_1))); diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java index c1186c6bd8699..70567da1aadfb 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java @@ -57,7 +57,7 @@ public void testJoinDeduplication() { x -> localNode, null, Collections.emptySet()); JoinHelper joinHelper = new JoinHelper(Settings.EMPTY, null, null, transportService, () -> 0L, () -> null, (joinRequest, joinCallback) -> { throw new AssertionError(); }, startJoinRequest -> { throw new AssertionError(); }, - Collections.emptyList(), s -> {}); + Collections.emptyList(), (s, r) -> {}); transportService.start(); DiscoveryNode node1 = new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT); @@ -153,7 +153,7 @@ public void testJoinValidationRejectsMismatchedClusterUUID() { x -> localNode, null, Collections.emptySet()); new JoinHelper(Settings.EMPTY, null, null, transportService, () -> 0L, () -> localClusterState, (joinRequest, joinCallback) -> { throw new AssertionError(); }, startJoinRequest -> { throw new AssertionError(); }, - Collections.emptyList(), s -> {}); // registers request handler + Collections.emptyList(), (s, r) -> {}); // registers request handler transportService.start(); transportService.acceptIncomingRequests(); diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java index 8f7648130e8da..37145e991b6c1 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java @@ -174,7 +174,7 @@ transportService, writableRegistry(), () -> new InMemoryPersistedState(term, initialState), r -> emptyList(), new NoOpClusterApplier(), Collections.emptyList(), - random, s -> {}, ElectionStrategy.DEFAULT_INSTANCE); + random, (s, r) -> {}, ElectionStrategy.DEFAULT_INSTANCE); transportService.start(); transportService.acceptIncomingRequests(); transport = capturingTransport; diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/RoutingServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/RoutingServiceTests.java new file mode 100644 index 0000000000000..5368c1c5544d9 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/routing/RoutingServiceTests.java @@ -0,0 +1,170 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.routing; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.test.ClusterServiceUtils; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.Before; + +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.Matchers.startsWith; + +public class RoutingServiceTests extends ESTestCase { + + private ThreadPool threadPool; + private ClusterService clusterService; + + @Before + public void beforeTest() { + threadPool = new TestThreadPool("test"); + clusterService = ClusterServiceUtils.createClusterService(threadPool); + } + + @After + public void afterTest() { + clusterService.stop(); + threadPool.shutdown(); + } + + public void testRejectionUnlessStarted() { + final RoutingService routingService = new RoutingService(clusterService, (s, r) -> s); + final PlainActionFuture future = new PlainActionFuture<>(); + + if (randomBoolean()) { + routingService.start(); + routingService.stop(); + } else if (randomBoolean()) { + routingService.close(); + } + + routingService.reroute("test", future); + assertTrue(future.isDone()); + assertThat(expectThrows(IllegalStateException.class, future::actionGet).getMessage(), + startsWith("rejecting delayed reroute [test] in state [")); + } + + public void testReroutesWhenRequested() throws InterruptedException { + final AtomicLong rerouteCount = new AtomicLong(); + final RoutingService routingService = new RoutingService(clusterService, (s, r) -> { + rerouteCount.incrementAndGet(); + return s; + }); + + routingService.start(); + + long rerouteCountBeforeReroute = 0L; + final int iterations = between(1, 100); + final CountDownLatch countDownLatch = new CountDownLatch(iterations); + for (int i = 0; i < iterations; i++) { + rerouteCountBeforeReroute = Math.max(rerouteCountBeforeReroute, rerouteCount.get()); + routingService.reroute("iteration " + i, ActionListener.wrap(countDownLatch::countDown)); + } + countDownLatch.await(10, TimeUnit.SECONDS); + assertThat(rerouteCountBeforeReroute, lessThan(rerouteCount.get())); + } + + public void testBatchesReroutesTogether() throws BrokenBarrierException, InterruptedException { + final CyclicBarrier cyclicBarrier = new CyclicBarrier(2); + clusterService.submitStateUpdateTask("block master service", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + cyclicBarrier.await(); // notify test that we are blocked + cyclicBarrier.await(); // wait to be unblocked by test + return currentState; + } + + @Override + public void onFailure(String source, Exception e) { + throw new AssertionError(source, e); + } + }); + + cyclicBarrier.await(); // wait for master thread to be blocked + + final AtomicBoolean rerouteExecuted = new AtomicBoolean(); + final RoutingService routingService = new RoutingService(clusterService, (s, r) -> { + assertTrue(rerouteExecuted.compareAndSet(false, true)); // only called once + return s; + }); + + routingService.start(); + + final int iterations = between(1, 100); + final CountDownLatch countDownLatch = new CountDownLatch(iterations); + for (int i = 0; i < iterations; i++) { + routingService.reroute("iteration " + i, ActionListener.wrap(countDownLatch::countDown)); + } + + cyclicBarrier.await(); // allow master thread to continue; + countDownLatch.await(); // wait for reroute to complete + assertTrue(rerouteExecuted.get()); // see above for assertion that it's only called once + } + + public void testNotifiesOnFailure() throws InterruptedException { + + final RoutingService routingService = new RoutingService(clusterService, (s, r) -> { + if (rarely()) { + throw new ElasticsearchException("simulated"); + } + return randomBoolean() ? s : ClusterState.builder(s).build(); + }); + routingService.start(); + + final int iterations = between(1, 100); + final CountDownLatch countDownLatch = new CountDownLatch(iterations); + for (int i = 0; i < iterations; i++) { + routingService.reroute("iteration " + i, ActionListener.wrap(countDownLatch::countDown)); + if (rarely()) { + clusterService.getMasterService().setClusterStatePublisher( + randomBoolean() + ? ClusterServiceUtils.createClusterStatePublisher(clusterService.getClusterApplierService()) + : (event, publishListener, ackListener) + -> publishListener.onFailure(new FailedToCommitClusterStateException("simulated"))); + } + + if (rarely()) { + clusterService.getClusterApplierService().onNewClusterState("simulated", () -> { + ClusterState state = clusterService.state(); + return ClusterState.builder(state).nodes(DiscoveryNodes.builder(state.nodes()) + .masterNodeId(randomBoolean() ? null : state.nodes().getLocalNodeId())).build(); + }, (source, e) -> { }); + } + } + + assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); // i.e. it doesn't leak any listeners + } +} diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java index b245b0d35d6c6..5ba5b7a0a709a 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.cluster.routing.allocation; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterInfo; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; @@ -35,16 +36,17 @@ import org.elasticsearch.common.settings.Settings; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; public class DiskThresholdMonitorTests extends ESAllocationTestCase { - public void testMarkFloodStageIndicesReadOnly() { AllocationService allocation = createAllocationService(Settings.builder() .put("cluster.routing.allocation.node_concurrent_recoveries", 10).build()); @@ -61,7 +63,6 @@ public void testMarkFloodStageIndicesReadOnly() { .addAsNew(metaData.index("test")) .addAsNew(metaData.index("test_1")) .addAsNew(metaData.index("test_2")) - .build(); ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) .metaData(metaData).routingTable(routingTable).build(); @@ -74,18 +75,21 @@ public void testMarkFloodStageIndicesReadOnly() { ClusterState finalState = clusterState; AtomicBoolean reroute = new AtomicBoolean(false); AtomicReference> indices = new AtomicReference<>(); + AtomicLong currentTime = new AtomicLong(); DiskThresholdMonitor monitor = new DiskThresholdMonitor(settings, () -> finalState, - new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null) { + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, currentTime::get) { @Override - protected void reroute() { - assertTrue(reroute.compareAndSet(false, true)); - } - - @Override - protected void markIndicesReadOnly(Set indicesToMarkReadOnly) { + protected void markIndicesReadOnly(Set indicesToMarkReadOnly, ActionListener listener) { assertTrue(indices.compareAndSet(null, indicesToMarkReadOnly)); + listener.onResponse(null); } }; + + monitor.setRerouteAction((reason, listener) -> { + assertTrue(reroute.compareAndSet(false, true)); + listener.onResponse(null); + }); + ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder(); builder.put("node1", new DiskUsage("node1","node1", "/foo/bar", 100, 4)); builder.put("node2", new DiskUsage("node2","node2", "/foo/bar", 100, 30)); @@ -97,6 +101,7 @@ protected void markIndicesReadOnly(Set indicesToMarkReadOnly) { builder = ImmutableOpenMap.builder(); builder.put("node1", new DiskUsage("node1","node1", "/foo/bar", 100, 4)); builder.put("node2", new DiskUsage("node2","node2", "/foo/bar", 100, 5)); + currentTime.addAndGet(randomLongBetween(60001, 120000)); monitor.onNewInfo(new ClusterInfo(builder.build(), null, null, null)); assertTrue(reroute.get()); assertEquals(new HashSet<>(Arrays.asList("test_1", "test_2")), indices.get()); @@ -114,17 +119,17 @@ protected void markIndicesReadOnly(Set indicesToMarkReadOnly) { assertTrue(anotherFinalClusterState.blocks().indexBlocked(ClusterBlockLevel.WRITE, "test_2")); monitor = new DiskThresholdMonitor(settings, () -> anotherFinalClusterState, - new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null) { + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, currentTime::get) { @Override - protected void reroute() { - assertTrue(reroute.compareAndSet(false, true)); - } - - @Override - protected void markIndicesReadOnly(Set indicesToMarkReadOnly) { + protected void markIndicesReadOnly(Set indicesToMarkReadOnly, ActionListener listener) { assertTrue(indices.compareAndSet(null, indicesToMarkReadOnly)); + listener.onResponse(null); } }; + monitor.setRerouteAction((reason, listener) -> { + assertTrue(reroute.compareAndSet(false, true)); + listener.onResponse(null); + }); indices.set(null); reroute.set(false); @@ -133,6 +138,90 @@ protected void markIndicesReadOnly(Set indicesToMarkReadOnly) { builder.put("node2", new DiskUsage("node2","node2", "/foo/bar", 100, 5)); monitor.onNewInfo(new ClusterInfo(builder.build(), null, null, null)); assertTrue(reroute.get()); - assertEquals(new HashSet<>(Arrays.asList("test_1")), indices.get()); + assertEquals(Collections.singleton("test_1"), indices.get()); + } + + public void testDoesNotSubmitRerouteTaskTooFrequently() { + final ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2"))).build(); + AtomicLong currentTime = new AtomicLong(); + AtomicReference> listenerReference = new AtomicReference<>(); + DiskThresholdMonitor monitor = new DiskThresholdMonitor(Settings.EMPTY, () -> clusterState, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, currentTime::get) { + @Override + protected void markIndicesReadOnly(Set indicesToMarkReadOnly, ActionListener listener) { + throw new AssertionError("unexpected"); + } + }; + + monitor.setRerouteAction((reason, listener) -> { + assertNotNull(listener); + assertTrue(listenerReference.compareAndSet(null, listener)); + }); + + final ImmutableOpenMap.Builder allDisksOkBuilder; + allDisksOkBuilder = ImmutableOpenMap.builder(); + allDisksOkBuilder.put("node1", new DiskUsage("node1","node1", "/foo/bar", 100, 50)); + allDisksOkBuilder.put("node2", new DiskUsage("node2","node2", "/foo/bar", 100, 50)); + final ImmutableOpenMap allDisksOk = allDisksOkBuilder.build(); + + final ImmutableOpenMap.Builder oneDiskAboveWatermarkBuilder = ImmutableOpenMap.builder(); + oneDiskAboveWatermarkBuilder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(5, 9))); + oneDiskAboveWatermarkBuilder.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, 50)); + final ImmutableOpenMap oneDiskAboveWatermark = oneDiskAboveWatermarkBuilder.build(); + + // should not reroute when all disks are ok + currentTime.addAndGet(randomLongBetween(0, 120000)); + monitor.onNewInfo(new ClusterInfo(allDisksOk, null, null, null)); + assertNull(listenerReference.get()); + + // should reroute when one disk goes over the watermark + currentTime.addAndGet(randomLongBetween(0, 120000)); + monitor.onNewInfo(new ClusterInfo(oneDiskAboveWatermark, null, null, null)); + assertNotNull(listenerReference.get()); + listenerReference.getAndSet(null).onResponse(null); + + if (randomBoolean()) { + // should not re-route again within the reroute interval + currentTime.addAndGet(randomLongBetween(0, + DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.get(Settings.EMPTY).millis())); + monitor.onNewInfo(new ClusterInfo(allDisksOk, null, null, null)); + assertNull(listenerReference.get()); + } + + // should reroute again when one disk is still over the watermark + currentTime.addAndGet(randomLongBetween( + DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.get(Settings.EMPTY).millis() + 1, 120000)); + monitor.onNewInfo(new ClusterInfo(oneDiskAboveWatermark, null, null, null)); + assertNotNull(listenerReference.get()); + final ActionListener rerouteListener1 = listenerReference.getAndSet(null); + + // should not re-route again before reroute has completed + currentTime.addAndGet(randomLongBetween(0, 120000)); + monitor.onNewInfo(new ClusterInfo(allDisksOk, null, null, null)); + assertNull(listenerReference.get()); + + // complete reroute + rerouteListener1.onResponse(null); + + if (randomBoolean()) { + // should not re-route again within the reroute interval + currentTime.addAndGet(randomLongBetween(0, + DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.get(Settings.EMPTY).millis())); + monitor.onNewInfo(new ClusterInfo(allDisksOk, null, null, null)); + assertNull(listenerReference.get()); + } + + // should reroute again after the reroute interval + currentTime.addAndGet(randomLongBetween( + DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.get(Settings.EMPTY).millis() + 1, 120000)); + monitor.onNewInfo(new ClusterInfo(allDisksOk, null, null, null)); + assertNotNull(listenerReference.get()); + listenerReference.getAndSet(null).onResponse(null); + + // should not reroute again when it is not required + currentTime.addAndGet(randomLongBetween(0, 120000)); + monitor.onNewInfo(new ClusterInfo(allDisksOk, null, null, null)); + assertNull(listenerReference.get()); } } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java index 4580c5b59ed2d..595c144fa173e 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java @@ -19,7 +19,6 @@ package org.elasticsearch.cluster.routing.allocation.decider; -import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse; import org.elasticsearch.cluster.ClusterInfo; import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.ClusterState; @@ -31,7 +30,6 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; -import org.elasticsearch.test.junit.annotations.TestLogging; import java.util.ArrayList; import java.util.Collection; @@ -53,7 +51,6 @@ protected Collection> nodePlugins() { return Collections.singletonList(MockInternalClusterInfoService.TestPlugin.class); } - @TestLogging("org.elasticsearch.indices.recovery:TRACE,org.elasticsearch.cluster.service:TRACE") public void testRerouteOccursOnDiskPassingHighWatermark() throws Exception { List nodes = internalCluster().startNodes(3); @@ -105,12 +102,6 @@ public void testRerouteOccursOnDiskPassingHighWatermark() throws Exception { assertBusy(() -> { final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); - logger.info("--> {}", clusterState.routingTable()); - - final RecoveryResponse recoveryResponse = client().admin().indices() - .prepareRecoveries("test").setActiveOnly(true).setDetailed(true).get(); - logger.info("--> recoveries: {}", recoveryResponse); - final Map nodesToShardCount = new HashMap<>(); for (final RoutingNode node : clusterState.getRoutingNodes()) { logger.info("--> node {} has {} shards", diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java index 99f94e567a5e2..80aaae2049fd5 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java @@ -213,7 +213,7 @@ allocationService, new AliasValidator(), environment, transportService, clusterService, threadPool, createIndexService, actionFilters, indexNameExpressionResolver); nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger); - joinTaskExecutor = new JoinTaskExecutor(allocationService, logger, s -> {}); + joinTaskExecutor = new JoinTaskExecutor(allocationService, logger, (s, r) -> {}); } public ClusterState createIndex(ClusterState state, CreateIndexRequest request) { diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 07da055185a84..a00dfd71eb1ce 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -950,7 +950,7 @@ protected void assertSnapshotOrGenericThread() { transportService, indicesService, actionFilters, indexNameExpressionResolver); final ShardStateAction shardStateAction = new ShardStateAction( clusterService, transportService, allocationService, - new RoutingService(clusterService, allocationService), + new RoutingService(clusterService, allocationService::reroute), threadPool ); final MetaDataMappingService metaDataMappingService = new MetaDataMappingService(clusterService, indicesService); @@ -1134,7 +1134,7 @@ public void start(ClusterState initialState) { hostsResolver -> testClusterNodes.nodes.values().stream().filter(n -> n.node.isMasterNode()) .map(n -> n.node.getAddress()).collect(Collectors.toList()), clusterService.getClusterApplierService(), Collections.emptyList(), random(), - new RoutingService(clusterService, allocationService)::reroute, ElectionStrategy.DEFAULT_INSTANCE); + new RoutingService(clusterService, allocationService::reroute)::reroute, ElectionStrategy.DEFAULT_INSTANCE); masterService.setClusterStatePublisher(coordinator); coordinator.start(); masterService.start(); diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java index 8bdedaceba71c..0c27f84d7f1e4 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java @@ -841,7 +841,7 @@ protected Optional getDisruptableMockTransport(Transpo final AllocationService allocationService = ESAllocationTestCase.createAllocationService(Settings.EMPTY); coordinator = new Coordinator("test_node", settings, clusterSettings, transportService, writableRegistry(), allocationService, masterService, this::getPersistedState, - Cluster.this::provideSeedHosts, clusterApplierService, onJoinValidators, Randomness.get(), s -> {}, + Cluster.this::provideSeedHosts, clusterApplierService, onJoinValidators, Randomness.get(), (s, r) -> {}, getElectionStrategy()); masterService.setClusterStatePublisher(coordinator); final GatewayService gatewayService = new GatewayService(settings, allocationService, clusterService,