Skip to content

Commit

Permalink
Add _replica and _replica_first as search preference.
Browse files Browse the repository at this point in the history
Just like specifying `?preference=_primary`, this adds the ability to
specify `?preference=_replica` or `?preference=_replica_first` on
requests that support it.

Resolves elastic#12222
  • Loading branch information
dakrone committed Jul 16, 2015
1 parent 439c67a commit a8391fc
Show file tree
Hide file tree
Showing 6 changed files with 204 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
final ImmutableList<ShardRouting> shards;
final ImmutableList<ShardRouting> activeShards;
final ImmutableList<ShardRouting> assignedShards;
final static ImmutableList<ShardRouting> NO_SHARDS = ImmutableList.of();
final boolean allShardsStarted;

/**
Expand Down Expand Up @@ -279,6 +280,16 @@ public ShardIterator activeInitializingShardsIt(int seed) {
return new PlainShardIterator(shardId, ordered);
}

/**
* Returns true if no primaries are active or initializing for this shard
*/
private boolean noPrimariesActive() {
if (!primaryAsList.isEmpty() && !primaryAsList.get(0).active() && !primaryAsList.get(0).initializing()) {
return true;
}
return false;
}

/**
* Returns an iterator only on the primary shard.
*/
Expand All @@ -287,9 +298,8 @@ public ShardIterator primaryShardIt() {
}

public ShardIterator primaryActiveInitializingShardIt() {
if (!primaryAsList.isEmpty() && !primaryAsList.get(0).active() && !primaryAsList.get(0).initializing()) {
List<ShardRouting> primaryList = ImmutableList.of();
return new PlainShardIterator(shardId, primaryList);
if (noPrimariesActive()) {
return new PlainShardIterator(shardId, NO_SHARDS);
}
return primaryShardIt();
}
Expand All @@ -312,6 +322,49 @@ public ShardIterator primaryFirstActiveInitializingShardsIt() {
return new PlainShardIterator(shardId, ordered);
}

public ShardIterator replicaActiveInitializingShardIt() {
// If the primaries are unassigned, return an empty list (there aren't
// any replicas to query anyway)
if (noPrimariesActive()) {
return new PlainShardIterator(shardId, NO_SHARDS);
}

LinkedList<ShardRouting> ordered = new LinkedList<>();
for (ShardRouting replica : shuffler.shuffle(replicas)) {
if (replica.active()) {
ordered.addFirst(replica);
} else if (replica.initializing()) {
ordered.addLast(replica);
}
}
return new PlainShardIterator(shardId, ordered);
}

public ShardIterator replicaFirstActiveInitializingShardsIt() {
// If the primaries are unassigned, return an empty list (there aren't
// any replicas to query anyway)
if (noPrimariesActive()) {
return new PlainShardIterator(shardId, NO_SHARDS);
}

ArrayList<ShardRouting> ordered = new ArrayList<>(activeShards.size() + allInitializingShards.size());
// fill it in a randomized fashion with the active replicas
for (ShardRouting replica : shuffler.shuffle(replicas)) {
if (replica.active()) {
ordered.add(replica);
}
}

// Add the primary shard
ordered.add(primary);

// Add initializing shards last
if (!allInitializingShards.isEmpty()) {
ordered.addAll(allInitializingShards);
}
return new PlainShardIterator(shardId, ordered);
}

public ShardIterator onlyNodeActiveInitializingShardsIt(String nodeId) {
ArrayList<ShardRouting> ordered = new ArrayList<>(activeShards.size() + allInitializingShards.size());
// fill it in a randomized fashion
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,12 @@ private ShardIterator preferenceActiveShardIterator(IndexShardRoutingTable index
return indexShard.preferNodeActiveInitializingShardsIt(localNodeId);
case PRIMARY:
return indexShard.primaryActiveInitializingShardIt();
case REPLICA:
return indexShard.replicaActiveInitializingShardIt();
case PRIMARY_FIRST:
return indexShard.primaryFirstActiveInitializingShardsIt();
case REPLICA_FIRST:
return indexShard.replicaFirstActiveInitializingShardsIt();
case ONLY_LOCAL:
return indexShard.onlyNodeActiveInitializingShardsIt(localNodeId);
case ONLY_NODE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,21 @@ public enum Preference {
*/
PRIMARY("_primary"),

/**
* Route to replica shards
*/
REPLICA("_replica"),

/**
* Route to primary shards first
*/
PRIMARY_FIRST("_primary_first"),

/**
* Route to replica shards first
*/
REPLICA_FIRST("_replica_first"),

/**
* Route to the local shard only
*/
Expand Down Expand Up @@ -96,9 +106,14 @@ public static Preference parse(String preference) {
return LOCAL;
case "_primary":
return PRIMARY;
case "_replica":
return REPLICA;
case "_primary_first":
case "_primaryFirst":
return PRIMARY_FIRST;
case "_replica_first":
case "_replicaFirst":
return REPLICA_FIRST;
case "_only_local":
case "_onlyLocal":
return ONLY_LOCAL;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,4 +381,90 @@ public void testShardsAndPreferNodeRouting() {
assertThat(shardIterators.iterator().next().shardId().id(), equalTo(0));
assertThat(shardIterators.iterator().next().nextOrNull().currentNodeId(), equalTo("node1"));
}

@Test
public void testReplicaShardPreferenceIters() throws Exception {
AllocationService strategy = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.build());

OperationRouting operationRouting = new OperationRouting(Settings.Builder.EMPTY_SETTINGS, new AwarenessAllocationDecider());

MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(2).numberOfReplicas(2))
.build();

RoutingTable routingTable = RoutingTable.builder()
.addAsNew(metaData.index("test"))
.build();

ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build();

clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder()
.put(newNode("node1"))
.put(newNode("node2"))
.put(newNode("node3"))
.localNodeId("node1")
).build();
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();

routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();

// When replicas haven't initialized, it comes back with the primary first, then initializing replicas
GroupShardsIterator shardIterators = operationRouting.searchShards(clusterState, new String[]{"test"}, null, "_replica_first");
assertThat(shardIterators.size(), equalTo(2)); // two potential shards
ShardIterator iter = shardIterators.iterator().next();
assertThat(iter.size(), equalTo(3)); // three potential candidates for the shard
ShardRouting routing = iter.nextOrNull();
assertNotNull(routing);
assertThat(routing.shardId().id(), anyOf(equalTo(0), equalTo(1)));
assertTrue(routing.primary()); // replicas haven't initialized yet, so primary is first
assertTrue(routing.started());
routing = iter.nextOrNull();
assertThat(routing.shardId().id(), anyOf(equalTo(0), equalTo(1)));
assertFalse(routing.primary());
assertTrue(routing.initializing());
routing = iter.nextOrNull();
assertThat(routing.shardId().id(), anyOf(equalTo(0), equalTo(1)));
assertFalse(routing.primary());
assertTrue(routing.initializing());

routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();

routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();


shardIterators = operationRouting.searchShards(clusterState, new String[]{"test"}, null, "_replica");
assertThat(shardIterators.size(), equalTo(2)); // two potential shards
iter = shardIterators.iterator().next();
assertThat(iter.size(), equalTo(2)); // two potential replicas for the shard
routing = iter.nextOrNull();
assertNotNull(routing);
assertThat(routing.shardId().id(), anyOf(equalTo(0), equalTo(1)));
assertFalse(routing.primary());
routing = iter.nextOrNull();
assertThat(routing.shardId().id(), anyOf(equalTo(0), equalTo(1)));
assertFalse(routing.primary());

shardIterators = operationRouting.searchShards(clusterState, new String[]{"test"}, null, "_replica_first");
assertThat(shardIterators.size(), equalTo(2)); // two potential shards
iter = shardIterators.iterator().next();
assertThat(iter.size(), equalTo(3)); // three potential candidates for the shard
routing = iter.nextOrNull();
assertNotNull(routing);
assertThat(routing.shardId().id(), anyOf(equalTo(0), equalTo(1)));
assertFalse(routing.primary());
routing = iter.nextOrNull();
assertThat(routing.shardId().id(), anyOf(equalTo(0), equalTo(1)));
assertFalse(routing.primary());
// finally the primary
routing = iter.nextOrNull();
assertThat(routing.shardId().id(), anyOf(equalTo(0), equalTo(1)));
assertTrue(routing.primary());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public void noPreferenceRandom() throws Exception {

@Test
public void simplePreferenceTests() throws Exception {
createIndex("test");
client().admin().indices().prepareCreate("test").setSettings("number_of_replicas=1").get();
ensureGreen();

client().prepareIndex("test", "type1").setSource("field1", "value1").execute().actionGet();
Expand All @@ -104,12 +104,47 @@ public void simplePreferenceTests() throws Exception {
searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_primary").execute().actionGet();
assertThat(searchResponse.getHits().totalHits(), equalTo(1l));

searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_replica").execute().actionGet();
assertThat(searchResponse.getHits().totalHits(), equalTo(1l));
searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_replica").execute().actionGet();
assertThat(searchResponse.getHits().totalHits(), equalTo(1l));

searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_replica_first").execute().actionGet();
assertThat(searchResponse.getHits().totalHits(), equalTo(1l));
searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_replica_first").execute().actionGet();
assertThat(searchResponse.getHits().totalHits(), equalTo(1l));

searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("1234").execute().actionGet();
assertThat(searchResponse.getHits().totalHits(), equalTo(1l));
searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("1234").execute().actionGet();
assertThat(searchResponse.getHits().totalHits(), equalTo(1l));
}

@Test
public void testReplicaPreference() throws Exception {
client().admin().indices().prepareCreate("test").setSettings("number_of_replicas=0").get();
ensureGreen();

client().prepareIndex("test", "type1").setSource("field1", "value1").execute().actionGet();
refresh();

try {
client().prepareSearch().setQuery(matchAllQuery()).setPreference("_replica").execute().actionGet();
fail("should have failed because there are no replicas");
} catch (Exception e) {
// pass
}

SearchResponse resp = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_replica_first").execute().actionGet();
assertThat(resp.getHits().totalHits(), equalTo(1l));

client().admin().indices().prepareUpdateSettings("test").setSettings("number_of_replicas=1").get();
ensureGreen("test");

resp = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_replica").execute().actionGet();
assertThat(resp.getHits().totalHits(), equalTo(1l));
}

@Test (expected = IllegalArgumentException.class)
public void testThatSpecifyingNonExistingNodesReturnsUsefulError() throws Exception {
createIndex("test");
Expand Down
7 changes: 7 additions & 0 deletions docs/reference/search/request/preference.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ The `preference` is a query string parameter which can be set to:
The operation will go and be executed on the primary
shard, and if not available (failover), will execute on other shards.

`_replica`::
The operation will go and be executed only on a replica shard.

`_replica_first`::
The operation will go and be executed only on a replica shard, and if
not available (failover), will execute on other shards.

`_local`::
The operation will prefer to be executed on a local
allocated shard if possible.
Expand Down

0 comments on commit a8391fc

Please sign in to comment.