Skip to content

Commit

Permalink
Encapsulate HPPC usage inside of ImmutableOpen(Int)Map (#80859)
Browse files Browse the repository at this point in the history
This commit encapsulates a lot of the usage of HPPC into `ImmutableOpenMap` and `ImmutableOpenIntMap`.

To accomplish this, this commit adds the `entrySet()` method to these classes returning `Set<Map.Entry<KType, VType>>`. This allows a map to be streamable using java Streams (`myMap.entrySet().stream()`) as well as ensuring that using a Java `foreach` loop **does not leak the underlying HPPC implementation outside of the ImmutableOpenMap**. This means that a variable could be switched from `ImmutableOpenMap` to `HashMap` without affecting the traversal of its contents.

The rest of this PR is migrating usages of the HPPC iterators (`IntObjectCursor`, `ObjectCursor`, `ObjectObjectCursor`) to use `entrySet()` and regular java `Map.Entry<>` looping.

I also ran benchmarks with Rally before and after these changes, and wasn't able to detect an appreciable difference between these.

Subsequent work for this would involve migrating the rest of the Java source off of HPPC, and then experimenting with moving our collection use from `ImmutableObjectMap` to something like `HashMap`, or potentially backing `ImmutableObjectMap` with a built-in Java collection if we decided we want to keep it around. This PR was getting big enough that I felt it useful to open it as progress and for discussion rather than trying to do everything at once.

Co-authored-by: Joe Gallo <[email protected]>
  • Loading branch information
dakrone and joegallo authored Dec 3, 2021
1 parent 89a0005 commit 1a1961d
Show file tree
Hide file tree
Showing 73 changed files with 810 additions and 638 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@

package org.elasticsearch.action.admin.indices.get;

import com.carrotsearch.hppc.cursors.ObjectObjectCursor;

import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest.Feature;
import org.elasticsearch.action.support.IndicesOptions;
Expand All @@ -23,6 +21,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_METADATA_BLOCK;
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_BLOCKS_METADATA;
Expand Down Expand Up @@ -287,8 +286,8 @@ private void assertEmptyMappings(GetIndexResponse response) {

private void assertEmptyAliases(GetIndexResponse response) {
assertThat(response.aliases(), notNullValue());
for (final ObjectObjectCursor<String, List<AliasMetadata>> entry : response.getAliases()) {
assertTrue(entry.value.isEmpty());
for (final Map.Entry<String, List<AliasMetadata>> entry : response.getAliases().entrySet()) {
assertTrue(entry.getValue().isEmpty());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@

package org.elasticsearch.action.admin.indices.shards;

import com.carrotsearch.hppc.cursors.IntObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectCursor;

import org.apache.lucene.index.CorruptIndexException;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.Requests;
Expand Down Expand Up @@ -81,8 +78,8 @@ public void testBasic() throws Exception {
assertThat(response.getStoreStatuses().containsKey(index), equalTo(true));
ImmutableOpenIntMap<List<IndicesShardStoresResponse.StoreStatus>> shardStores = response.getStoreStatuses().get(index);
assertThat(shardStores.values().size(), equalTo(2));
for (ObjectCursor<List<IndicesShardStoresResponse.StoreStatus>> shardStoreStatuses : shardStores.values()) {
for (IndicesShardStoresResponse.StoreStatus storeStatus : shardStoreStatuses.value) {
for (Map.Entry<Integer, List<IndicesShardStoresResponse.StoreStatus>> shardStoreStatuses : shardStores.entrySet()) {
for (IndicesShardStoresResponse.StoreStatus storeStatus : shardStoreStatuses.getValue()) {
assertThat(storeStatus.getAllocationId(), notNullValue());
assertThat(storeStatus.getNode(), notNullValue());
assertThat(storeStatus.getStoreException(), nullValue());
Expand All @@ -103,11 +100,11 @@ public void testBasic() throws Exception {
assertThat(response.getStoreStatuses().containsKey(index), equalTo(true));
ImmutableOpenIntMap<List<IndicesShardStoresResponse.StoreStatus>> shardStoresStatuses = response.getStoreStatuses().get(index);
assertThat(shardStoresStatuses.size(), equalTo(unassignedShards.size()));
for (IntObjectCursor<List<IndicesShardStoresResponse.StoreStatus>> storesStatus : shardStoresStatuses) {
assertThat("must report for one store", storesStatus.value.size(), equalTo(1));
for (Map.Entry<Integer, List<IndicesShardStoresResponse.StoreStatus>> storesStatus : shardStoresStatuses.entrySet()) {
assertThat("must report for one store", storesStatus.getValue().size(), equalTo(1));
assertThat(
"reported store should be primary",
storesStatus.value.get(0).getAllocationStatus(),
storesStatus.getValue().get(0).getAllocationStatus(),
equalTo(IndicesShardStoresResponse.StoreStatus.AllocationStatus.PRIMARY)
);
}
Expand Down Expand Up @@ -187,18 +184,18 @@ public void testCorruptedShards() throws Exception {
ImmutableOpenIntMap<List<IndicesShardStoresResponse.StoreStatus>> shardStatuses = rsp.getStoreStatuses().get(index);
assertNotNull(shardStatuses);
assertThat(shardStatuses.size(), greaterThan(0));
for (IntObjectCursor<List<IndicesShardStoresResponse.StoreStatus>> shardStatus : shardStatuses) {
for (IndicesShardStoresResponse.StoreStatus status : shardStatus.value) {
if (corruptedShardIDMap.containsKey(shardStatus.key)
&& corruptedShardIDMap.get(shardStatus.key).contains(status.getNode().getName())) {
for (Map.Entry<Integer, List<IndicesShardStoresResponse.StoreStatus>> shardStatus : shardStatuses.entrySet()) {
for (IndicesShardStoresResponse.StoreStatus status : shardStatus.getValue()) {
if (corruptedShardIDMap.containsKey(shardStatus.getKey())
&& corruptedShardIDMap.get(shardStatus.getKey()).contains(status.getNode().getName())) {
assertThat(
"shard [" + shardStatus.key + "] is failed on node [" + status.getNode().getName() + "]",
"shard [" + shardStatus.getKey() + "] is failed on node [" + status.getNode().getName() + "]",
status.getStoreException(),
notNullValue()
);
} else {
assertNull(
"shard [" + shardStatus.key + "] is not failed on node [" + status.getNode().getName() + "]",
"shard [" + shardStatus.getKey() + "] is not failed on node [" + status.getNode().getName() + "]",
status.getStoreException()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@

package org.elasticsearch.aliases;

import com.carrotsearch.hppc.cursors.ObjectObjectCursor;

import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest.AliasActions;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse;
Expand Down Expand Up @@ -46,6 +44,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -1018,8 +1017,8 @@ public void testIndicesGetAliases() throws Exception {
assertAcked(admin().indices().prepareAliases().removeAlias("foobar", "foo"));

getResponse = admin().indices().prepareGetAliases("foo").addIndices("foobar").get();
for (final ObjectObjectCursor<String, List<AliasMetadata>> entry : getResponse.getAliases()) {
assertTrue(entry.value.isEmpty());
for (final Map.Entry<String, List<AliasMetadata>> entry : getResponse.getAliases().entrySet()) {
assertTrue(entry.getValue().isEmpty());
}
assertTrue(admin().indices().prepareGetAliases("foo").addIndices("foobar").get().getAliases().isEmpty());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@

package org.elasticsearch.cluster;

import com.carrotsearch.hppc.cursors.ObjectCursor;

import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.coordination.CoordinationMetadata;
Expand Down Expand Up @@ -52,6 +50,7 @@

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static java.util.Collections.emptyList;
Expand Down Expand Up @@ -145,9 +144,9 @@ public void testClusterStateDiffSerialization() throws Exception {
assertThat(clusterStateFromDiffs.nodes().getNodes(), equalTo(clusterState.nodes().getNodes()));
assertThat(clusterStateFromDiffs.nodes().getLocalNodeId(), equalTo(previousClusterStateFromDiffs.nodes().getLocalNodeId()));
assertThat(clusterStateFromDiffs.nodes().getNodes(), equalTo(clusterState.nodes().getNodes()));
for (ObjectCursor<String> node : clusterStateFromDiffs.nodes().getNodes().keys()) {
DiscoveryNode node1 = clusterState.nodes().get(node.value);
DiscoveryNode node2 = clusterStateFromDiffs.nodes().get(node.value);
for (Map.Entry<String, DiscoveryNode> node : clusterStateFromDiffs.nodes().getNodes().entrySet()) {
DiscoveryNode node1 = clusterState.nodes().get(node.getKey());
DiscoveryNode node2 = clusterStateFromDiffs.nodes().get(node.getKey());
assertThat(node1.getVersion(), equalTo(node2.getVersion()));
assertThat(node1.getAddress(), equalTo(node2.getAddress()));
assertThat(node1.getAttributes(), equalTo(node2.getAttributes()));
Expand Down Expand Up @@ -320,16 +319,16 @@ private IndexRoutingTable randomIndexRoutingTable(String index, String[] nodeIds
*/
private IndexRoutingTable randomChangeToIndexRoutingTable(IndexRoutingTable original, String[] nodes) {
IndexRoutingTable.Builder builder = IndexRoutingTable.builder(original.getIndex());
for (ObjectCursor<IndexShardRoutingTable> indexShardRoutingTable : original.shards().values()) {
for (Map.Entry<Integer, IndexShardRoutingTable> indexShardRoutingTable : original.shards().entrySet()) {
Set<String> availableNodes = Sets.newHashSet(nodes);
for (ShardRouting shardRouting : indexShardRoutingTable.value.shards()) {
for (ShardRouting shardRouting : indexShardRoutingTable.getValue().shards()) {
availableNodes.remove(shardRouting.currentNodeId());
if (shardRouting.relocating()) {
availableNodes.remove(shardRouting.relocatingNodeId());
}
}

for (ShardRouting shardRouting : indexShardRoutingTable.value.shards()) {
for (ShardRouting shardRouting : indexShardRoutingTable.getValue().shards()) {
final ShardRouting updatedShardRouting = randomChange(shardRouting, availableNodes);
availableNodes.remove(updatedShardRouting.currentNodeId());
if (shardRouting.relocating()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@

package org.elasticsearch.cluster.routing;

import com.carrotsearch.hppc.cursors.IntObjectCursor;

import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequestBuilder;
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse;
Expand Down Expand Up @@ -50,6 +48,7 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -296,9 +295,9 @@ public void testForceStaleReplicaToBePromotedToPrimary() throws Exception {
.getStoreStatuses()
.get(idxName);
ClusterRerouteRequestBuilder rerouteBuilder = client().admin().cluster().prepareReroute();
for (IntObjectCursor<List<IndicesShardStoresResponse.StoreStatus>> shardStoreStatuses : storeStatuses) {
int shardId = shardStoreStatuses.key;
IndicesShardStoresResponse.StoreStatus storeStatus = randomFrom(shardStoreStatuses.value);
for (Map.Entry<Integer, List<IndicesShardStoresResponse.StoreStatus>> shardStoreStatuses : storeStatuses.entrySet()) {
int shardId = shardStoreStatuses.getKey();
IndicesShardStoresResponse.StoreStatus storeStatus = randomFrom(shardStoreStatuses.getValue());
logger.info("--> adding allocation command for shard {}", shardId);
// force allocation based on node id
if (useStaleReplica) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import joptsimple.OptionParser;
import joptsimple.OptionSet;

import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;

import org.apache.lucene.index.IndexWriter;
Expand Down Expand Up @@ -208,10 +207,10 @@ public Settings onNodeStopped(String nodeName) throws Exception {
String nodeId = null;
final ClusterState state = client().admin().cluster().prepareState().get().getState();
final DiscoveryNodes nodes = state.nodes();
for (ObjectObjectCursor<String, DiscoveryNode> cursor : nodes.getNodes()) {
final String name = cursor.value.getName();
for (Map.Entry<String, DiscoveryNode> cursor : nodes.getNodes().entrySet()) {
final String name = cursor.getValue().getName();
if (name.equals(node)) {
nodeId = cursor.key;
nodeId = cursor.getKey();
break;
}
}
Expand Down Expand Up @@ -396,10 +395,10 @@ public Settings onNodeStopped(String nodeName) throws Exception {
String primaryNodeId = null;
final ClusterState state = client().admin().cluster().prepareState().get().getState();
final DiscoveryNodes nodes = state.nodes();
for (ObjectObjectCursor<String, DiscoveryNode> cursor : nodes.getNodes()) {
final String name = cursor.value.getName();
for (Map.Entry<String, DiscoveryNode> cursor : nodes.getNodes().entrySet()) {
final String name = cursor.getValue().getName();
if (name.equals(node1)) {
primaryNodeId = cursor.key;
primaryNodeId = cursor.getKey();
break;
}
}
Expand Down Expand Up @@ -603,8 +602,8 @@ public void testResolvePath() throws Exception {
final Map<String, String> nodeNameToNodeId = new HashMap<>();
final ClusterState state = client().admin().cluster().prepareState().get().getState();
final DiscoveryNodes nodes = state.nodes();
for (ObjectObjectCursor<String, DiscoveryNode> cursor : nodes.getNodes()) {
nodeNameToNodeId.put(cursor.value.getName(), cursor.key);
for (Map.Entry<String, DiscoveryNode> cursor : nodes.getNodes().entrySet()) {
nodeNameToNodeId.put(cursor.getValue().getName(), cursor.getKey());
}

final GroupShardsIterator<ShardIterator> shardIterators = state.getRoutingTable()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
*/
package org.elasticsearch.index.store;

import com.carrotsearch.hppc.cursors.IntObjectCursor;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;

import org.apache.lucene.index.CheckIndex;
Expand Down Expand Up @@ -78,6 +77,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CopyOnWriteArrayList;
Expand Down Expand Up @@ -636,9 +636,11 @@ public void testReplicaCorruption() throws Exception {

final IndicesShardStoresResponse stores = client().admin().indices().prepareShardStores(index.getName()).get();

for (IntObjectCursor<List<IndicesShardStoresResponse.StoreStatus>> shards : stores.getStoreStatuses().get(index.getName())) {
for (IndicesShardStoresResponse.StoreStatus store : shards.value) {
final ShardId shardId = new ShardId(index, shards.key);
for (Map.Entry<Integer, List<IndicesShardStoresResponse.StoreStatus>> shards : stores.getStoreStatuses()
.get(index.getName())
.entrySet()) {
for (IndicesShardStoresResponse.StoreStatus store : shards.getValue()) {
final ShardId shardId = new ShardId(index, shards.getKey());
if (store.getAllocationStatus().equals(IndicesShardStoresResponse.StoreStatus.AllocationStatus.UNUSED)) {
for (Path path : findFilesToCorruptOnNode(store.getNode().getName(), shardId)) {
try (OutputStream os = Files.newOutputStream(path)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@

package org.elasticsearch.indices.stats;

import com.carrotsearch.hppc.cursors.ObjectObjectCursor;

import org.apache.lucene.util.LuceneTestCase.SuppressCodecs;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
Expand Down Expand Up @@ -68,6 +66,7 @@
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
Expand Down Expand Up @@ -822,12 +821,12 @@ public void testSegmentsStats() {
assertThat(stats.getTotal().getSegments().getCount(), equalTo((long) test1.totalNumShards));
if (includeSegmentFileSizes) {
assertThat(stats.getTotal().getSegments().getFiles().size(), greaterThan(0));
for (ObjectObjectCursor<String, SegmentsStats.FileStats> cursor : stats.getTotal().getSegments().getFiles()) {
assertThat(cursor.value.getExt(), notNullValue());
assertThat(cursor.value.getTotal(), greaterThan(0L));
assertThat(cursor.value.getCount(), greaterThan(0L));
assertThat(cursor.value.getMin(), greaterThan(0L));
assertThat(cursor.value.getMax(), greaterThan(0L));
for (Map.Entry<String, SegmentsStats.FileStats> cursor : stats.getTotal().getSegments().getFiles().entrySet()) {
assertThat(cursor.getValue().getExt(), notNullValue());
assertThat(cursor.getValue().getTotal(), greaterThan(0L));
assertThat(cursor.getValue().getCount(), greaterThan(0L));
assertThat(cursor.getValue().getMin(), greaterThan(0L));
assertThat(cursor.getValue().getMax(), greaterThan(0L));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@

package org.elasticsearch.operateAllIndices;

import com.carrotsearch.hppc.cursors.ObjectObjectCursor;

import org.elasticsearch.action.support.DestructiveOperations;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESIntegTestCase;
import org.junit.After;

import java.util.Map;

import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.WRITE;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -91,8 +91,8 @@ public void testCloseIndexDefaultBehaviour() throws Exception {
}

ClusterState state = client().admin().cluster().prepareState().get().getState();
for (ObjectObjectCursor<String, IndexMetadata> indexMetadataObjectObjectCursor : state.getMetadata().indices()) {
assertEquals(IndexMetadata.State.CLOSE, indexMetadataObjectObjectCursor.value.getState());
for (Map.Entry<String, IndexMetadata> indexMetadataEntry : state.getMetadata().indices().entrySet()) {
assertEquals(IndexMetadata.State.CLOSE, indexMetadataEntry.getValue().getState());
}
}

Expand Down Expand Up @@ -126,8 +126,8 @@ public void testOpenIndexDefaultBehaviour() throws Exception {
}

ClusterState state = client().admin().cluster().prepareState().get().getState();
for (ObjectObjectCursor<String, IndexMetadata> indexMetadataObjectObjectCursor : state.getMetadata().indices()) {
assertEquals(IndexMetadata.State.OPEN, indexMetadataObjectObjectCursor.value.getState());
for (Map.Entry<String, IndexMetadata> indexMetadataEntry : state.getMetadata().indices().entrySet()) {
assertEquals(IndexMetadata.State.OPEN, indexMetadataEntry.getValue().getState());
}
}

Expand Down
Loading

0 comments on commit 1a1961d

Please sign in to comment.