Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decouple DiskThresholdMonitor & ClusterInfoService #44105

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,23 @@

package org.elasticsearch.cluster;

import java.util.function.Consumer;

/**
* Interface for a class used to gather information about a cluster at
* regular intervals
* Interface for a class used to gather information about a cluster periodically.
*/
@FunctionalInterface
public interface ClusterInfoService {

/** The latest cluster information */
/**
* @return the latest cluster information
*/
ClusterInfo getClusterInfo();

/**
* Add a listener for new cluster information
*/
default void addListener(Consumer<ClusterInfo> clusterInfoConsumer) {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

package org.elasticsearch.cluster;

import java.util.function.Consumer;

/**
* ClusterInfoService that provides empty maps for disk usage and shard sizes
* {@link ClusterInfoService} that provides empty maps for disk usage and shard sizes
*/
public class EmptyClusterInfoService implements ClusterInfoService {
public static final EmptyClusterInfoService INSTANCE = new EmptyClusterInfoService();
Expand All @@ -29,4 +31,9 @@ public class EmptyClusterInfoService implements ClusterInfoService {
public ClusterInfo getClusterInfo() {
return ClusterInfo.EMPTY;
}

@Override
public void addListener(Consumer<ClusterInfo> clusterInfoConsumer) {
// never updated, so we can discard the listener
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
Expand All @@ -46,6 +47,8 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ReceiveTimeoutTransportException;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -85,10 +88,9 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode
private final ClusterService clusterService;
private final ThreadPool threadPool;
private final NodeClient client;
private final Consumer<ClusterInfo> listener;
private final List<Consumer<ClusterInfo>> listeners = Collections.synchronizedList(new ArrayList<>(1));

public InternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client,
Consumer<ClusterInfo> listener) {
public InternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client) {
this.leastAvailableSpaceUsages = ImmutableOpenMap.of();
this.mostAvailableSpaceUsages = ImmutableOpenMap.of();
this.shardRoutingToDataPath = ImmutableOpenMap.of();
Expand All @@ -109,7 +111,6 @@ public InternalClusterInfoService(Settings settings, ClusterService clusterServi
this.clusterService.addLocalNodeMasterListener(this);
// Add to listen for state changes (when nodes are added)
this.clusterService.addListener(this);
this.listener = listener;
}

private void setEnabled(boolean enabled) {
Expand Down Expand Up @@ -356,14 +357,25 @@ public void onFailure(Exception e) {
Thread.currentThread().interrupt(); // restore interrupt status
}
ClusterInfo clusterInfo = getClusterInfo();
try {
listener.accept(clusterInfo);
} catch (Exception e) {
logger.info("Failed executing ClusterInfoService listener", e);
boolean anyListeners = false;
for (final Consumer<ClusterInfo> listener : listeners) {
anyListeners = true;
try {
logger.trace("notifying [{}] of new cluster info", listener);
listener.accept(clusterInfo);
} catch (Exception e) {
logger.info(new ParameterizedMessage("failed to notify [{}] of new cluster info", listener), e);
}
}
assert anyListeners : "expected to notify at least one listener";
return clusterInfo;
}

@Override
public void addListener(Consumer<ClusterInfo> clusterInfoConsumer) {
listeners.add(clusterInfoConsumer);
}

static void buildShardLevelInfo(Logger logger, ShardStats[] stats, ImmutableOpenMap.Builder<String, Long> newShardSizes,
ImmutableOpenMap.Builder<ShardRouting, String> newShardRoutingToDataPath) {
for (ShardStats s : stats) {
Expand Down

This file was deleted.

20 changes: 8 additions & 12 deletions server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import org.elasticsearch.Build;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.ActionModule;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.admin.cluster.snapshots.status.TransportNodesSnapshotsStatus;
import org.elasticsearch.action.search.SearchExecutionStatsCollector;
import org.elasticsearch.action.search.SearchPhaseController;
Expand All @@ -38,7 +38,6 @@
import org.elasticsearch.bootstrap.BootstrapContext;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.ClusterName;
Expand All @@ -57,7 +56,6 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.routing.BatchedRerouteService;
import org.elasticsearch.cluster.routing.LazilyInitializedRerouteService;
import org.elasticsearch.cluster.routing.RerouteService;
import org.elasticsearch.cluster.routing.allocation.DiskThresholdMonitor;
import org.elasticsearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -177,7 +175,6 @@
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -371,11 +368,7 @@ protected Node(
.newHashPublisher());
final IngestService ingestService = new IngestService(clusterService, threadPool, this.environment,
scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), pluginsService.filterPlugins(IngestPlugin.class));
final LazilyInitializedRerouteService lazilyInitializedRerouteService = new LazilyInitializedRerouteService();
final DiskThresholdMonitor diskThresholdMonitor = new DiskThresholdMonitor(settings, clusterService::state,
clusterService.getClusterSettings(), client, threadPool::relativeTimeInMillis, lazilyInitializedRerouteService);
final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client,
diskThresholdMonitor::onNewInfo);
final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client);
final UsageService usageService = new UsageService();

ModulesBuilder modules = new ModulesBuilder();
Expand Down Expand Up @@ -508,7 +501,10 @@ protected Node(

final RerouteService rerouteService
= new BatchedRerouteService(clusterService, clusterModule.getAllocationService()::reroute);
lazilyInitializedRerouteService.setRerouteService(rerouteService);
final DiskThresholdMonitor diskThresholdMonitor = new DiskThresholdMonitor(settings, clusterService::state,
clusterService.getClusterSettings(), client, threadPool::relativeTimeInMillis, rerouteService);
clusterInfoService.addListener(diskThresholdMonitor::onNewInfo);

final DiscoveryModule discoveryModule = new DiscoveryModule(settings, threadPool, transportService, namedWriteableRegistry,
networkService, clusterService.getMasterService(), clusterService.getClusterApplierService(),
clusterService.getClusterSettings(), pluginsService.filterPlugins(DiscoveryPlugin.class),
Expand Down Expand Up @@ -1017,8 +1013,8 @@ private List<NetworkService.CustomNameResolver> getCustomNameResolvers(List<Disc

/** Constructs a ClusterInfoService which may be mocked for tests. */
protected ClusterInfoService newClusterInfoService(Settings settings, ClusterService clusterService,
ThreadPool threadPool, NodeClient client, Consumer<ClusterInfo> listeners) {
return new InternalClusterInfoService(settings, clusterService, threadPool, client, listeners);
ThreadPool threadPool, NodeClient client) {
return new InternalClusterInfoService(settings, clusterService, threadPool, client);
}

/** Constructs a {@link org.elasticsearch.http.HttpServerTransport} which may be mocked for tests. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,9 @@ public void testDiskThreshold() {
new SameShardAllocationDecider(Settings.EMPTY, clusterSettings),
makeDecider(diskSettings))));

ClusterInfoService cis = new ClusterInfoService() {
@Override
public ClusterInfo getClusterInfo() {
logger.info("--> calling fake getClusterInfo");
return clusterInfo;
}
ClusterInfoService cis = () -> {
logger.info("--> calling fake getClusterInfo");
return clusterInfo;
};
AllocationService strategy = new AllocationService(deciders,
new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), cis);
Expand Down Expand Up @@ -278,12 +275,9 @@ public void testDiskThresholdWithAbsoluteSizes() {
new SameShardAllocationDecider(Settings.EMPTY, clusterSettings),
makeDecider(diskSettings))));

ClusterInfoService cis = new ClusterInfoService() {
@Override
public ClusterInfo getClusterInfo() {
logger.info("--> calling fake getClusterInfo");
return clusterInfo;
}
ClusterInfoService cis = () -> {
logger.info("--> calling fake getClusterInfo");
return clusterInfo;
};

AllocationService strategy = new AllocationService(deciders, new TestGatewayAllocator(),
Expand Down Expand Up @@ -328,12 +322,9 @@ public ClusterInfo getClusterInfo() {
usagesBuilder.put(nodeWithoutPrimary, new DiskUsage(nodeWithoutPrimary, "", "/dev/null", 100, 35)); // 65% used
usages = usagesBuilder.build();
final ClusterInfo clusterInfo2 = new DevNullClusterInfo(usages, usages, shardSizes);
cis = new ClusterInfoService() {
@Override
public ClusterInfo getClusterInfo() {
logger.info("--> calling fake getClusterInfo");
return clusterInfo2;
}
cis = () -> {
logger.info("--> calling fake getClusterInfo");
return clusterInfo2;
};
strategy = new AllocationService(deciders, new TestGatewayAllocator(),
new BalancedShardsAllocator(Settings.EMPTY), cis);
Expand Down Expand Up @@ -516,12 +507,9 @@ Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLU
),
makeDecider(diskSettings))));

ClusterInfoService cis = new ClusterInfoService() {
@Override
public ClusterInfo getClusterInfo() {
logger.info("--> calling fake getClusterInfo");
return clusterInfo;
}
ClusterInfoService cis = () -> {
logger.info("--> calling fake getClusterInfo");
return clusterInfo;
};

AllocationService strategy = new AllocationService(deciders, new TestGatewayAllocator(),
Expand Down Expand Up @@ -580,12 +568,9 @@ Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLU
),
makeDecider(diskSettings))));

ClusterInfoService cis = new ClusterInfoService() {
@Override
public ClusterInfo getClusterInfo() {
logger.info("--> calling fake getClusterInfo");
return clusterInfo;
}
ClusterInfoService cis = () -> {
logger.info("--> calling fake getClusterInfo");
return clusterInfo;
};

AllocationService strategy = new AllocationService(deciders, new TestGatewayAllocator(),
Expand Down Expand Up @@ -677,12 +662,9 @@ public void testShardRelocationsTakenIntoAccount() {
Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
), decider)));

ClusterInfoService cis = new ClusterInfoService() {
@Override
public ClusterInfo getClusterInfo() {
logger.info("--> calling fake getClusterInfo");
return clusterInfo;
}
ClusterInfoService cis = () -> {
logger.info("--> calling fake getClusterInfo");
return clusterInfo;
};

AllocationService strategy = new AllocationService(deciders, new TestGatewayAllocator(),
Expand Down Expand Up @@ -856,12 +838,9 @@ public void testCanRemainWithShardRelocatingAway() {
}

// Creating AllocationService instance and the services it depends on...
ClusterInfoService cis = new ClusterInfoService() {
@Override
public ClusterInfo getClusterInfo() {
logger.info("--> calling fake getClusterInfo");
return clusterInfo;
}
ClusterInfoService cis = () -> {
logger.info("--> calling fake getClusterInfo");
return clusterInfo;
};
AllocationDeciders deciders = new AllocationDeciders(new HashSet<>(Arrays.asList(
new SameShardAllocationDecider(
Expand Down Expand Up @@ -948,13 +927,10 @@ public void testForSingleDataNode() {

// Two shards should start happily
assertThat(decision.type(), equalTo(Decision.Type.YES));
assertThat(((Decision.Single) decision).getExplanation(), containsString("there is only a single data node present"));
ClusterInfoService cis = new ClusterInfoService() {
@Override
public ClusterInfo getClusterInfo() {
logger.info("--> calling fake getClusterInfo");
return clusterInfo;
}
assertThat(decision.getExplanation(), containsString("there is only a single data node present"));
ClusterInfoService cis = () -> {
logger.info("--> calling fake getClusterInfo");
return clusterInfo;
};

AllocationDeciders deciders = new AllocationDeciders(new HashSet<>(Arrays.asList(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,6 @@
*/
package org.elasticsearch.cluster;

import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
Expand All @@ -40,6 +35,10 @@
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;

Expand Down Expand Up @@ -71,9 +70,8 @@ public static NodeStats makeStats(String nodeName, DiskUsage usage) {
null, null, null, null);
}

public MockInternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client,
Consumer<ClusterInfo> listener) {
super(settings, clusterService, threadPool, client, listener);
public MockInternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client) {
super(settings, clusterService, threadPool, client);
this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
stats[0] = makeStats("node_t1", new DiskUsage("node_t1", "n1", "/dev/null", 100, 100));
stats[1] = makeStats("node_t2", new DiskUsage("node_t2", "n2", "/dev/null", 100, 100));
Expand Down
Loading