Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[RW Separation] Introduce allocation filter to control placement of s…
Browse files Browse the repository at this point in the history
…earch only replicas (#15455)

* Introduce allocation filter to control placement of search only replicas

Signed-off-by: Marc Handalian <[email protected]>

* Add a new decider rather than updating the existing FilterAllocationDecider

Signed-off-by: Marc Handalian <[email protected]>

* Fix license header and description on SearchReplicaAllocationDecider

Signed-off-by: Marc Handalian <[email protected]>

* Pr feedback.

Signed-off-by: Marc Handalian <[email protected]>

* Fix class name to pass precommit checks

Signed-off-by: Marc Handalian <[email protected]>

* Refactor all search replica create/update tests to a single OpenSearchSingleNodeTestCase.

Signed-off-by: Marc Handalian <[email protected]>

* remove changelog entry

Signed-off-by: Marc Handalian <[email protected]>

---------

Signed-off-by: Marc Handalian <[email protected]>
(cherry picked from commit b345439)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
github-actions[bot] committed Sep 4, 2024
1 parent bd5041f commit 425f1de
Showing 9 changed files with 546 additions and 133 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.allocation;

import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.List;
import java.util.stream.Collectors;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
import static org.opensearch.cluster.routing.allocation.decider.SearchReplicaAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SearchReplicaFilteringAllocationIT extends OpenSearchIntegTestCase {

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, Boolean.TRUE).build();
}

public void testSearchReplicaDedicatedIncludes() {
List<String> nodesIds = internalCluster().startNodes(3);
final String node_0 = nodesIds.get(0);
final String node_1 = nodesIds.get(1);
final String node_2 = nodesIds.get(2);
assertEquals(3, cluster().size());

client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(
Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", node_1 + "," + node_0)
)
.execute()
.actionGet();

createIndex(
"test",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 1)
.put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build()
);
ensureGreen("test");
// ensure primary is not on node 0 or 1,
IndexShardRoutingTable routingTable = getRoutingTable();
assertEquals(node_2, getNodeName(routingTable.primaryShard().currentNodeId()));

String existingSearchReplicaNode = getNodeName(routingTable.searchOnlyReplicas().get(0).currentNodeId());
String emptyAllowedNode = existingSearchReplicaNode.equals(node_0) ? node_1 : node_0;

// set the included nodes to the other open node, search replica should relocate to that node.
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", emptyAllowedNode))
.execute()
.actionGet();
ensureGreen("test");

routingTable = getRoutingTable();
assertEquals(node_2, getNodeName(routingTable.primaryShard().currentNodeId()));
assertEquals(emptyAllowedNode, getNodeName(routingTable.searchOnlyReplicas().get(0).currentNodeId()));
}

public void testSearchReplicaDedicatedIncludes_DoNotAssignToOtherNodes() {
List<String> nodesIds = internalCluster().startNodes(3);
final String node_0 = nodesIds.get(0);
final String node_1 = nodesIds.get(1);
final String node_2 = nodesIds.get(2);
assertEquals(3, cluster().size());

// set filter on 1 node and set search replica count to 2 - should leave 1 unassigned
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", node_1))
.execute()
.actionGet();

logger.info("--> creating an index with no replicas");
createIndex(
"test",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 2)
.put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build()
);
ensureYellowAndNoInitializingShards("test");
IndexShardRoutingTable routingTable = getRoutingTable();
assertEquals(2, routingTable.searchOnlyReplicas().size());
List<ShardRouting> assignedSearchShards = routingTable.searchOnlyReplicas()
.stream()
.filter(ShardRouting::assignedToNode)
.collect(Collectors.toList());
assertEquals(1, assignedSearchShards.size());
assertEquals(node_1, getNodeName(assignedSearchShards.get(0).currentNodeId()));
assertEquals(1, routingTable.searchOnlyReplicas().stream().filter(ShardRouting::unassigned).count());
}

private IndexShardRoutingTable getRoutingTable() {
IndexShardRoutingTable routingTable = getClusterState().routingTable().index("test").getShards().get(0);
return routingTable;
}

private String getNodeName(String id) {
return getClusterState().nodes().get(id).getName();
}
}
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
import static org.opensearch.cluster.routing.allocation.decider.SearchReplicaAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 1)
public class SearchOnlyReplicaFeatureFlagIT extends OpenSearchIntegTestCase {
@@ -52,4 +53,15 @@ public void testUpdateFeatureFlagDisabled() {
});
assertTrue(exception.getMessage().contains("unknown setting"));
}

public void testFilterAllocationSettingNotRegistered() {
expectThrows(SettingsException.class, () -> {
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", "node"))
.execute()
.actionGet();
});
}
}
Original file line number Diff line number Diff line change
@@ -74,6 +74,7 @@
import org.opensearch.cluster.routing.allocation.decider.ResizeAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.RestoreInProgressAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.SameShardAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.SearchReplicaAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.SnapshotInProgressAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.TargetPoolAllocationDecider;
@@ -84,6 +85,7 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.util.set.Sets;
import org.opensearch.core.ParseField;
@@ -376,6 +378,9 @@ public static Collection<AllocationDecider> createAllocationDeciders(
addAllocationDecider(deciders, new SnapshotInProgressAllocationDecider());
addAllocationDecider(deciders, new RestoreInProgressAllocationDecider());
addAllocationDecider(deciders, new FilterAllocationDecider(settings, clusterSettings));
if (FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL_SETTING.get(settings)) {
addAllocationDecider(deciders, new SearchReplicaAllocationDecider(settings, clusterSettings));
}
addAllocationDecider(deciders, new SameShardAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new DiskThresholdDecider(settings, clusterSettings));
addAllocationDecider(deciders, new ThrottlingAllocationDecider(settings, clusterSettings));
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.routing.allocation.decider;

import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodeFilters;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.node.remotestore.RemoteStoreNodeService;

import java.util.Map;

import static org.opensearch.cluster.node.DiscoveryNodeFilters.IP_VALIDATOR;
import static org.opensearch.cluster.node.DiscoveryNodeFilters.OpType.OR;

/**
* This allocation decider is similar to FilterAllocationDecider but provides
* the option to filter specifically for search replicas.
* The filter behaves similar to an include for any defined node attribute.
* A search replica can be allocated to only nodes with one of the specified attributes while
* other shard types will be rejected from nodes with any othe attributes.
* @opensearch.internal
*/
public class SearchReplicaAllocationDecider extends AllocationDecider {

public static final String NAME = "filter";
private static final String SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_PREFIX = "cluster.routing.allocation.search.replica.dedicated.include";
public static final Setting.AffixSetting<String> SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING = Setting.prefixKeySetting(
SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_PREFIX + ".",
key -> Setting.simpleString(key, value -> IP_VALIDATOR.accept(key, value), Property.Dynamic, Property.NodeScope)
);

private volatile DiscoveryNodeFilters searchReplicaIncludeFilters;

private volatile RemoteStoreNodeService.Direction migrationDirection;
private volatile RemoteStoreNodeService.CompatibilityMode compatibilityMode;

public SearchReplicaAllocationDecider(Settings settings, ClusterSettings clusterSettings) {
setSearchReplicaIncludeFilters(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getAsMap(settings));
clusterSettings.addAffixMapUpdateConsumer(
SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING,
this::setSearchReplicaIncludeFilters,
(a, b) -> {}
);
}

@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return shouldFilter(shardRouting, node.node(), allocation);
}

@Override
public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return shouldFilter(shardRouting, node.node(), allocation);
}

private Decision shouldFilter(ShardRouting shardRouting, DiscoveryNode node, RoutingAllocation allocation) {
if (searchReplicaIncludeFilters != null) {
final boolean match = searchReplicaIncludeFilters.match(node);
if (match == false && shardRouting.isSearchOnly()) {
return allocation.decision(
Decision.NO,
NAME,
"node does not match shard setting [%s] filters [%s]",
SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_PREFIX,
searchReplicaIncludeFilters
);
}
// filter will only apply to search replicas
if (shardRouting.isSearchOnly() == false && match) {
return allocation.decision(
Decision.NO,
NAME,
"only search replicas can be allocated to node with setting [%s] filters [%s]",
SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_PREFIX,
searchReplicaIncludeFilters
);
}
}
return allocation.decision(Decision.YES, NAME, "node passes include/exclude/require filters");
}

private void setSearchReplicaIncludeFilters(Map<String, String> filters) {
searchReplicaIncludeFilters = DiscoveryNodeFilters.trimTier(
DiscoveryNodeFilters.buildOrUpdateFromKeyValue(searchReplicaIncludeFilters, OR, filters)
);
}
}
Original file line number Diff line number Diff line change
@@ -77,6 +77,7 @@
import org.opensearch.cluster.routing.allocation.decider.FilterAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.NodeLoadAwareAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.SameShardAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.SearchReplicaAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
import org.opensearch.cluster.service.ClusterApplierService;
@@ -816,6 +817,8 @@ public void apply(Settings value, Settings current, Settings previous) {
OpenSearchOnHeapCacheSettings.EXPIRE_AFTER_ACCESS_SETTING.getConcreteSettingForNamespace(
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
)
)
),
List.of(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL),
List.of(SearchReplicaAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING)
);
}
Original file line number Diff line number Diff line change
@@ -138,12 +138,10 @@
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_ROUTING_SHARDS_SETTING;
import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING;
import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING;
import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_READ_ONLY_BLOCK;
import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_REPLICATION_TYPE_SETTING;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_READ_ONLY;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY;
@@ -157,7 +155,6 @@
import static org.opensearch.cluster.metadata.MetadataCreateIndexService.getIndexNumberOfRoutingShards;
import static org.opensearch.cluster.metadata.MetadataCreateIndexService.parseV1Mappings;
import static org.opensearch.cluster.metadata.MetadataCreateIndexService.resolveAndValidateAliases;
import static org.opensearch.common.util.FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL;
import static org.opensearch.common.util.FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL;
import static org.opensearch.index.IndexModule.INDEX_STORE_TYPE_SETTING;
import static org.opensearch.index.IndexSettings.INDEX_MERGE_POLICY;

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -323,7 +323,7 @@ private ClusterState createInitialClusterState(AllocationService service, Settin
return createInitialClusterState(service, indexSettings, Settings.EMPTY);
}

private ClusterState createInitialClusterState(AllocationService service, Settings idxSettings, Settings clusterSettings) {
static ClusterState createInitialClusterState(AllocationService service, Settings idxSettings, Settings clusterSettings) {
Metadata.Builder metadata = Metadata.builder();
metadata.persistentSettings(clusterSettings);
final Settings.Builder indexSettings = settings(Version.CURRENT).put(idxSettings);
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.routing.allocation.decider;

import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.EmptyClusterInfoService;
import org.opensearch.cluster.OpenSearchAllocationTestCase;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.snapshots.EmptySnapshotsInfoService;
import org.opensearch.test.gateway.TestGatewayAllocator;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;

import static org.opensearch.cluster.routing.allocation.decider.SearchReplicaAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING;

public class SearchReplicaAllocationDeciderTests extends OpenSearchAllocationTestCase {

public void testSearchReplicaRoutingDedicatedIncludes() {
// we aren't using a settingsModule here so we need to set feature flag gated setting
Set<Setting<?>> settings = new HashSet<>(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
settings.add(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING);
ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), settings);
Settings initialSettings = Settings.builder()
.put("cluster.routing.allocation.search.replica.dedicated.include._id", "node1,node2")
.build();

SearchReplicaAllocationDecider filterAllocationDecider = new SearchReplicaAllocationDecider(initialSettings, clusterSettings);
AllocationDeciders allocationDeciders = new AllocationDeciders(
Arrays.asList(
filterAllocationDecider,
new SameShardAllocationDecider(Settings.EMPTY, clusterSettings),
new ReplicaAfterPrimaryActiveAllocationDecider()
)
);
AllocationService service = new AllocationService(
allocationDeciders,
new TestGatewayAllocator(),
new BalancedShardsAllocator(Settings.EMPTY),
EmptyClusterInfoService.INSTANCE,
EmptySnapshotsInfoService.INSTANCE
);
ClusterState state = FilterAllocationDeciderTests.createInitialClusterState(service, Settings.EMPTY, Settings.EMPTY);
RoutingTable routingTable = state.routingTable();
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, null, null, 0);
allocation.debugDecision(true);

ShardRouting searchReplica = ShardRouting.newUnassigned(
routingTable.index("sourceIndex").shard(0).shardId(),
false,
true,
RecoverySource.PeerRecoverySource.INSTANCE,
new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "")
);

ShardRouting regularReplica = ShardRouting.newUnassigned(
routingTable.index("sourceIndex").shard(0).shardId(),
false,
false,
RecoverySource.PeerRecoverySource.INSTANCE,
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "")
);

ShardRouting primary = ShardRouting.newUnassigned(
routingTable.index("sourceIndex").shard(0).shardId(),
true,
false,
RecoverySource.PeerRecoverySource.INSTANCE,
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "")
);

Decision.Single decision = (Decision.Single) filterAllocationDecider.canAllocate(
searchReplica,
state.getRoutingNodes().node("node2"),
allocation
);
assertEquals(decision.toString(), Decision.Type.YES, decision.type());
decision = (Decision.Single) filterAllocationDecider.canAllocate(searchReplica, state.getRoutingNodes().node("node1"), allocation);
assertEquals(decision.toString(), Decision.Type.YES, decision.type());

decision = (Decision.Single) filterAllocationDecider.canAllocate(regularReplica, state.getRoutingNodes().node("node2"), allocation);
assertEquals(decision.toString(), Decision.Type.NO, decision.type());
decision = (Decision.Single) filterAllocationDecider.canAllocate(regularReplica, state.getRoutingNodes().node("node1"), allocation);
assertEquals(decision.toString(), Decision.Type.NO, decision.type());

decision = (Decision.Single) filterAllocationDecider.canAllocate(primary, state.getRoutingNodes().node("node1"), allocation);
assertEquals(decision.toString(), Decision.Type.NO, decision.type());
decision = (Decision.Single) filterAllocationDecider.canAllocate(primary, state.getRoutingNodes().node("node2"), allocation);
assertEquals(decision.toString(), Decision.Type.NO, decision.type());

Settings updatedSettings = Settings.builder()
.put("cluster.routing.allocation.search.replica.dedicated.include._id", "node2")
.build();
clusterSettings.applySettings(updatedSettings);

decision = (Decision.Single) filterAllocationDecider.canAllocate(searchReplica, state.getRoutingNodes().node("node2"), allocation);
assertEquals(decision.toString(), Decision.Type.YES, decision.type());
decision = (Decision.Single) filterAllocationDecider.canAllocate(searchReplica, state.getRoutingNodes().node("node1"), allocation);
assertEquals(decision.toString(), Decision.Type.NO, decision.type());
decision = (Decision.Single) filterAllocationDecider.canRemain(searchReplica, state.getRoutingNodes().node("node1"), allocation);
assertEquals(decision.toString(), Decision.Type.NO, decision.type());

decision = (Decision.Single) filterAllocationDecider.canAllocate(regularReplica, state.getRoutingNodes().node("node2"), allocation);
assertEquals(decision.toString(), Decision.Type.NO, decision.type());
decision = (Decision.Single) filterAllocationDecider.canAllocate(regularReplica, state.getRoutingNodes().node("node1"), allocation);
assertEquals(decision.toString(), Decision.Type.YES, decision.type());
decision = (Decision.Single) filterAllocationDecider.canRemain(regularReplica, state.getRoutingNodes().node("node1"), allocation);
assertEquals(decision.toString(), Decision.Type.YES, decision.type());

decision = (Decision.Single) filterAllocationDecider.canAllocate(primary, state.getRoutingNodes().node("node1"), allocation);
assertEquals(decision.toString(), Decision.Type.YES, decision.type());
decision = (Decision.Single) filterAllocationDecider.canAllocate(primary, state.getRoutingNodes().node("node2"), allocation);
assertEquals(decision.toString(), Decision.Type.NO, decision.type());
decision = (Decision.Single) filterAllocationDecider.canRemain(primary, state.getRoutingNodes().node("node1"), allocation);
assertEquals(decision.toString(), Decision.Type.YES, decision.type());
}
}

0 comments on commit 425f1de

Please sign in to comment.