Skip to content

Commit

Permalink
Decouple DiskThresholdMonitor & ClusterInfoService (#44105)
Browse files Browse the repository at this point in the history
Today the `ClusterInfoService` requires the `DiskThresholdMonitor` at
construction time so that it can notify it when nodes report changes in their
disk usage, but this is awkward to construct: the `DiskThresholdMonitor`
requires a `RerouteService` which requires an `AllocationService` which comees
from the `ClusterModule` which requires the `ClusterInfoService`.

Today we break the cycle with a `LazilyInitializedRerouteService` which is
itself a little ugly. This commit replaces this with a more traditional
subject/observer relationship between the `ClusterInfoService` and the
`DiskThresholdMonitor`.
  • Loading branch information
DaveCTurner authored Jul 9, 2019
1 parent b842ea8 commit af512f9
Show file tree
Hide file tree
Showing 8 changed files with 84 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,23 @@

package org.elasticsearch.cluster;

import java.util.function.Consumer;

/**
* Interface for a class used to gather information about a cluster at
* regular intervals
* Interface for a class used to gather information about a cluster periodically.
*/
@FunctionalInterface
public interface ClusterInfoService {

/** The latest cluster information */
/**
* @return the latest cluster information
*/
ClusterInfo getClusterInfo();

/**
* Add a listener for new cluster information
*/
default void addListener(Consumer<ClusterInfo> clusterInfoConsumer) {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

package org.elasticsearch.cluster;

import java.util.function.Consumer;

/**
* ClusterInfoService that provides empty maps for disk usage and shard sizes
* {@link ClusterInfoService} that provides empty maps for disk usage and shard sizes
*/
public class EmptyClusterInfoService implements ClusterInfoService {
public static final EmptyClusterInfoService INSTANCE = new EmptyClusterInfoService();
Expand All @@ -29,4 +31,9 @@ public class EmptyClusterInfoService implements ClusterInfoService {
public ClusterInfo getClusterInfo() {
return ClusterInfo.EMPTY;
}

@Override
public void addListener(Consumer<ClusterInfo> clusterInfoConsumer) {
// never updated, so we can discard the listener
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
Expand All @@ -46,6 +47,8 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ReceiveTimeoutTransportException;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -85,10 +88,9 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode
private final ClusterService clusterService;
private final ThreadPool threadPool;
private final NodeClient client;
private final Consumer<ClusterInfo> listener;
private final List<Consumer<ClusterInfo>> listeners = Collections.synchronizedList(new ArrayList<>(1));

public InternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client,
Consumer<ClusterInfo> listener) {
public InternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client) {
this.leastAvailableSpaceUsages = ImmutableOpenMap.of();
this.mostAvailableSpaceUsages = ImmutableOpenMap.of();
this.shardRoutingToDataPath = ImmutableOpenMap.of();
Expand All @@ -109,7 +111,6 @@ public InternalClusterInfoService(Settings settings, ClusterService clusterServi
this.clusterService.addLocalNodeMasterListener(this);
// Add to listen for state changes (when nodes are added)
this.clusterService.addListener(this);
this.listener = listener;
}

private void setEnabled(boolean enabled) {
Expand Down Expand Up @@ -356,14 +357,25 @@ public void onFailure(Exception e) {
Thread.currentThread().interrupt(); // restore interrupt status
}
ClusterInfo clusterInfo = getClusterInfo();
try {
listener.accept(clusterInfo);
} catch (Exception e) {
logger.info("Failed executing ClusterInfoService listener", e);
boolean anyListeners = false;
for (final Consumer<ClusterInfo> listener : listeners) {
anyListeners = true;
try {
logger.trace("notifying [{}] of new cluster info", listener);
listener.accept(clusterInfo);
} catch (Exception e) {
logger.info(new ParameterizedMessage("failed to notify [{}] of new cluster info", listener), e);
}
}
assert anyListeners : "expected to notify at least one listener";
return clusterInfo;
}

@Override
public void addListener(Consumer<ClusterInfo> clusterInfoConsumer) {
listeners.add(clusterInfoConsumer);
}

static void buildShardLevelInfo(Logger logger, ShardStats[] stats, ImmutableOpenMap.Builder<String, Long> newShardSizes,
ImmutableOpenMap.Builder<ShardRouting, String> newShardRoutingToDataPath) {
for (ShardStats s : stats) {
Expand Down

This file was deleted.

20 changes: 8 additions & 12 deletions server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import org.elasticsearch.Build;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.ActionModule;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.admin.cluster.snapshots.status.TransportNodesSnapshotsStatus;
import org.elasticsearch.action.search.SearchExecutionStatsCollector;
import org.elasticsearch.action.search.SearchPhaseController;
Expand All @@ -38,7 +38,6 @@
import org.elasticsearch.bootstrap.BootstrapContext;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.ClusterName;
Expand All @@ -57,7 +56,6 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.routing.BatchedRerouteService;
import org.elasticsearch.cluster.routing.LazilyInitializedRerouteService;
import org.elasticsearch.cluster.routing.RerouteService;
import org.elasticsearch.cluster.routing.allocation.DiskThresholdMonitor;
import org.elasticsearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -177,7 +175,6 @@
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -371,11 +368,7 @@ protected Node(
.newHashPublisher());
final IngestService ingestService = new IngestService(clusterService, threadPool, this.environment,
scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), pluginsService.filterPlugins(IngestPlugin.class));
final LazilyInitializedRerouteService lazilyInitializedRerouteService = new LazilyInitializedRerouteService();
final DiskThresholdMonitor diskThresholdMonitor = new DiskThresholdMonitor(settings, clusterService::state,
clusterService.getClusterSettings(), client, threadPool::relativeTimeInMillis, lazilyInitializedRerouteService);
final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client,
diskThresholdMonitor::onNewInfo);
final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client);
final UsageService usageService = new UsageService();

ModulesBuilder modules = new ModulesBuilder();
Expand Down Expand Up @@ -508,7 +501,10 @@ protected Node(

final RerouteService rerouteService
= new BatchedRerouteService(clusterService, clusterModule.getAllocationService()::reroute);
lazilyInitializedRerouteService.setRerouteService(rerouteService);
final DiskThresholdMonitor diskThresholdMonitor = new DiskThresholdMonitor(settings, clusterService::state,
clusterService.getClusterSettings(), client, threadPool::relativeTimeInMillis, rerouteService);
clusterInfoService.addListener(diskThresholdMonitor::onNewInfo);

final DiscoveryModule discoveryModule = new DiscoveryModule(settings, threadPool, transportService, namedWriteableRegistry,
networkService, clusterService.getMasterService(), clusterService.getClusterApplierService(),
clusterService.getClusterSettings(), pluginsService.filterPlugins(DiscoveryPlugin.class),
Expand Down Expand Up @@ -1017,8 +1013,8 @@ private List<NetworkService.CustomNameResolver> getCustomNameResolvers(List<Disc

/** Constructs a ClusterInfoService which may be mocked for tests. */
protected ClusterInfoService newClusterInfoService(Settings settings, ClusterService clusterService,
ThreadPool threadPool, NodeClient client, Consumer<ClusterInfo> listeners) {
return new InternalClusterInfoService(settings, clusterService, threadPool, client, listeners);
ThreadPool threadPool, NodeClient client) {
return new InternalClusterInfoService(settings, clusterService, threadPool, client);
}

/** Constructs a {@link org.elasticsearch.http.HttpServerTransport} which may be mocked for tests. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,9 @@ public void testDiskThreshold() {
new SameShardAllocationDecider(Settings.EMPTY, clusterSettings),
makeDecider(diskSettings))));

ClusterInfoService cis = new ClusterInfoService() {
@Override
public ClusterInfo getClusterInfo() {
logger.info("--> calling fake getClusterInfo");
return clusterInfo;
}
ClusterInfoService cis = () -> {
logger.info("--> calling fake getClusterInfo");
return clusterInfo;
};
AllocationService strategy = new AllocationService(deciders,
new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), cis);
Expand Down Expand Up @@ -278,12 +275,9 @@ public void testDiskThresholdWithAbsoluteSizes() {
new SameShardAllocationDecider(Settings.EMPTY, clusterSettings),
makeDecider(diskSettings))));

ClusterInfoService cis = new ClusterInfoService() {
@Override
public ClusterInfo getClusterInfo() {
logger.info("--> calling fake getClusterInfo");
return clusterInfo;
}
ClusterInfoService cis = () -> {
logger.info("--> calling fake getClusterInfo");
return clusterInfo;
};

AllocationService strategy = new AllocationService(deciders, new TestGatewayAllocator(),
Expand Down Expand Up @@ -328,12 +322,9 @@ public ClusterInfo getClusterInfo() {
usagesBuilder.put(nodeWithoutPrimary, new DiskUsage(nodeWithoutPrimary, "", "/dev/null", 100, 35)); // 65% used
usages = usagesBuilder.build();
final ClusterInfo clusterInfo2 = new DevNullClusterInfo(usages, usages, shardSizes);
cis = new ClusterInfoService() {
@Override
public ClusterInfo getClusterInfo() {
logger.info("--> calling fake getClusterInfo");
return clusterInfo2;
}
cis = () -> {
logger.info("--> calling fake getClusterInfo");
return clusterInfo2;
};
strategy = new AllocationService(deciders, new TestGatewayAllocator(),
new BalancedShardsAllocator(Settings.EMPTY), cis);
Expand Down Expand Up @@ -516,12 +507,9 @@ Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLU
),
makeDecider(diskSettings))));

ClusterInfoService cis = new ClusterInfoService() {
@Override
public ClusterInfo getClusterInfo() {
logger.info("--> calling fake getClusterInfo");
return clusterInfo;
}
ClusterInfoService cis = () -> {
logger.info("--> calling fake getClusterInfo");
return clusterInfo;
};

AllocationService strategy = new AllocationService(deciders, new TestGatewayAllocator(),
Expand Down Expand Up @@ -580,12 +568,9 @@ Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLU
),
makeDecider(diskSettings))));

ClusterInfoService cis = new ClusterInfoService() {
@Override
public ClusterInfo getClusterInfo() {
logger.info("--> calling fake getClusterInfo");
return clusterInfo;
}
ClusterInfoService cis = () -> {
logger.info("--> calling fake getClusterInfo");
return clusterInfo;
};

AllocationService strategy = new AllocationService(deciders, new TestGatewayAllocator(),
Expand Down Expand Up @@ -677,12 +662,9 @@ public void testShardRelocationsTakenIntoAccount() {
Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
), decider)));

ClusterInfoService cis = new ClusterInfoService() {
@Override
public ClusterInfo getClusterInfo() {
logger.info("--> calling fake getClusterInfo");
return clusterInfo;
}
ClusterInfoService cis = () -> {
logger.info("--> calling fake getClusterInfo");
return clusterInfo;
};

AllocationService strategy = new AllocationService(deciders, new TestGatewayAllocator(),
Expand Down Expand Up @@ -856,12 +838,9 @@ public void testCanRemainWithShardRelocatingAway() {
}

// Creating AllocationService instance and the services it depends on...
ClusterInfoService cis = new ClusterInfoService() {
@Override
public ClusterInfo getClusterInfo() {
logger.info("--> calling fake getClusterInfo");
return clusterInfo;
}
ClusterInfoService cis = () -> {
logger.info("--> calling fake getClusterInfo");
return clusterInfo;
};
AllocationDeciders deciders = new AllocationDeciders(new HashSet<>(Arrays.asList(
new SameShardAllocationDecider(
Expand Down Expand Up @@ -948,13 +927,10 @@ public void testForSingleDataNode() {

// Two shards should start happily
assertThat(decision.type(), equalTo(Decision.Type.YES));
assertThat(((Decision.Single) decision).getExplanation(), containsString("there is only a single data node present"));
ClusterInfoService cis = new ClusterInfoService() {
@Override
public ClusterInfo getClusterInfo() {
logger.info("--> calling fake getClusterInfo");
return clusterInfo;
}
assertThat(decision.getExplanation(), containsString("there is only a single data node present"));
ClusterInfoService cis = () -> {
logger.info("--> calling fake getClusterInfo");
return clusterInfo;
};

AllocationDeciders deciders = new AllocationDeciders(new HashSet<>(Arrays.asList(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,6 @@
*/
package org.elasticsearch.cluster;

import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
Expand All @@ -40,6 +35,10 @@
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;

Expand Down Expand Up @@ -71,9 +70,8 @@ public static NodeStats makeStats(String nodeName, DiskUsage usage) {
null, null, null, null);
}

public MockInternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client,
Consumer<ClusterInfo> listener) {
super(settings, clusterService, threadPool, client, listener);
public MockInternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client) {
super(settings, clusterService, threadPool, client);
this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
stats[0] = makeStats("node_t1", new DiskUsage("node_t1", "n1", "/dev/null", 100, 100));
stats[1] = makeStats("node_t2", new DiskUsage("node_t2", "n2", "/dev/null", 100, 100));
Expand Down
Loading

0 comments on commit af512f9

Please sign in to comment.