Skip to content

Commit

Permalink
Expose ImmutableOpenMap.values as a Java Collection (elastic#78529)
Browse files Browse the repository at this point in the history
elastic#76921 added support for getting entries as a Java stream, elastic#77897 added the ability to get keys as a Set. This PR adds the values method which returns a Collection to bring the ImmutableOpenMap API closer to java.util.Map
  • Loading branch information
arteam authored Oct 5, 2021
1 parent 19d12f1 commit cb73e3f
Show file tree
Hide file tree
Showing 84 changed files with 340 additions and 391 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -105,10 +104,10 @@ public void testParsingFromEsResponse() throws IOException {
assertThat(result.mappings().sourceAsMap(), equalTo(expectedMapping.get("_doc")));

assertThat(result.aliases().size(), equalTo(esIMD.aliases().size()));
List<AliasMetadata> expectedAliases = Arrays.stream(esIMD.aliases().values().toArray(AliasMetadata.class))
List<AliasMetadata> expectedAliases = esIMD.aliases().values().stream()
.sorted(Comparator.comparing(AliasMetadata::alias))
.collect(Collectors.toList());
List<AliasMetadata> actualAliases = Arrays.stream(result.aliases().values().toArray(AliasMetadata.class))
List<AliasMetadata> actualAliases = result.aliases().values().stream()
.sorted(Comparator.comparing(AliasMetadata::alias))
.collect(Collectors.toList());
for (int j = 0; j < result.aliases().size(); j++) {
Expand Down Expand Up @@ -186,8 +185,7 @@ static void toXContent(GetIndexTemplatesResponse response, XContentBuilder build

serverTemplateBuilder.patterns(clientITMD.patterns());

Iterator<AliasMetadata> aliases = clientITMD.aliases().valuesIt();
aliases.forEachRemaining((a)->serverTemplateBuilder.putAlias(a));
clientITMD.aliases().values().forEach(serverTemplateBuilder::putAlias);

serverTemplateBuilder.settings(clientITMD.settings());
serverTemplateBuilder.order(clientITMD.order());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.action.ingest.SimulatePipelineRequest;
import org.elasticsearch.action.ingest.SimulatePipelineResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
Expand Down Expand Up @@ -51,7 +52,6 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import java.util.zip.GZIPInputStream;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
Expand Down Expand Up @@ -398,8 +398,8 @@ private void putPipeline() throws IOException {
}

private List<Path> getGeoIpTmpDirs() throws IOException {
final Set<String> ids = StreamSupport.stream(clusterService().state().nodes().getDataNodes().values().spliterator(), false)
.map(c -> c.value.getId())
final Set<String> ids = clusterService().state().nodes().getDataNodes().values().stream()
.map(DiscoveryNode::getId)
.collect(Collectors.toSet());
// All nodes share the same geoip base dir in the shared tmp dir:
Path geoipBaseTmpDir = internalCluster().getDataNodeInstance(Environment.class).tmpFile().resolve("geoip-databases");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public void testCreateShrinkIndexToN() {
ImmutableOpenMap<String, DiscoveryNode> dataNodes = client().admin().cluster().prepareState().get().getState().nodes()
.getDataNodes();
assertTrue("at least 2 nodes but was: " + dataNodes.size(), dataNodes.size() >= 2);
DiscoveryNode[] discoveryNodes = dataNodes.values().toArray(DiscoveryNode.class);
DiscoveryNode[] discoveryNodes = dataNodes.values().toArray(DiscoveryNode[]::new);
String mergeNode = discoveryNodes[0].getName();
// ensure all shards are allocated otherwise the ensure green below might not succeed since we require the merge node
// if we change the setting too quickly we will end up with one replica unassigned which can't be assigned anymore due
Expand Down Expand Up @@ -159,7 +159,7 @@ public void testShrinkIndexPrimaryTerm() throws Exception {
final ImmutableOpenMap<String, DiscoveryNode> dataNodes =
client().admin().cluster().prepareState().get().getState().nodes().getDataNodes();
assertThat(dataNodes.size(), greaterThanOrEqualTo(2));
final DiscoveryNode[] discoveryNodes = dataNodes.values().toArray(DiscoveryNode.class);
final DiscoveryNode[] discoveryNodes = dataNodes.values().toArray(DiscoveryNode[]::new);
final String mergeNode = discoveryNodes[0].getName();
// This needs more than the default timeout if a large number of shards were created.
ensureGreen(TimeValue.timeValueSeconds(120));
Expand Down Expand Up @@ -239,7 +239,7 @@ public void testCreateShrinkIndex() {
ImmutableOpenMap<String, DiscoveryNode> dataNodes =
client().admin().cluster().prepareState().get().getState().nodes().getDataNodes();
assertTrue("at least 2 nodes but was: " + dataNodes.size(), dataNodes.size() >= 2);
DiscoveryNode[] discoveryNodes = dataNodes.values().toArray(DiscoveryNode.class);
DiscoveryNode[] discoveryNodes = dataNodes.values().toArray(DiscoveryNode[]::new);
// ensure all shards are allocated otherwise the ensure green below might not succeed since we require the merge node
// if we change the setting too quickly we will end up with one replica unassigned which can't be assigned anymore due
// to the require._name below.
Expand Down Expand Up @@ -340,7 +340,7 @@ public void testCreateShrinkIndexFails() throws Exception {
ImmutableOpenMap<String, DiscoveryNode> dataNodes = client().admin().cluster().prepareState().get().getState().nodes()
.getDataNodes();
assertTrue("at least 2 nodes but was: " + dataNodes.size(), dataNodes.size() >= 2);
DiscoveryNode[] discoveryNodes = dataNodes.values().toArray(DiscoveryNode.class);
DiscoveryNode[] discoveryNodes = dataNodes.values().toArray(DiscoveryNode[]::new);
String spareNode = discoveryNodes[0].getName();
String mergeNode = discoveryNodes[1].getName();
// ensure all shards are allocated otherwise the ensure green below might not succeed since we require the merge node
Expand Down Expand Up @@ -419,7 +419,7 @@ public void testCreateShrinkWithIndexSort() throws Exception {
ImmutableOpenMap<String, DiscoveryNode> dataNodes = client().admin().cluster().prepareState().get().getState().nodes()
.getDataNodes();
assertTrue("at least 2 nodes but was: " + dataNodes.size(), dataNodes.size() >= 2);
DiscoveryNode[] discoveryNodes = dataNodes.values().toArray(DiscoveryNode.class);
DiscoveryNode[] discoveryNodes = dataNodes.values().toArray(DiscoveryNode[]::new);
String mergeNode = discoveryNodes[0].getName();
// ensure all shards are allocated otherwise the ensure green below might not succeed since we require the merge node
// if we change the setting too quickly we will end up with one replica unassigned which can't be assigned anymore due
Expand Down Expand Up @@ -481,7 +481,7 @@ public void testShrinkCommitsMergeOnIdle() throws Exception {
client().admin().indices().prepareFlush("source").get();
ImmutableOpenMap<String, DiscoveryNode> dataNodes =
client().admin().cluster().prepareState().get().getState().nodes().getDataNodes();
DiscoveryNode[] discoveryNodes = dataNodes.values().toArray(DiscoveryNode.class);
DiscoveryNode[] discoveryNodes = dataNodes.values().toArray(DiscoveryNode[]::new);
// ensure all shards are allocated otherwise the ensure green below might not succeed since we require the merge node
// if we change the setting too quickly we will end up with one replica unassigned which can't be assigned anymore due
// to the require._name below.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.elasticsearch.cluster;

import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
Expand Down Expand Up @@ -159,20 +158,20 @@ public void testClusterInfoServiceCollectsInformation() {
assertNotNull(shardDataSetSizes);
assertThat("some usages are populated", leastUsages.values().size(), Matchers.equalTo(2));
assertThat("some shard sizes are populated", shardSizes.values().size(), greaterThan(0));
for (ObjectCursor<DiskUsage> usage : leastUsages.values()) {
logger.info("--> usage: {}", usage.value);
assertThat("usage has be retrieved", usage.value.getFreeBytes(), greaterThan(0L));
for (DiskUsage usage : leastUsages.values()) {
logger.info("--> usage: {}", usage);
assertThat("usage has be retrieved", usage.getFreeBytes(), greaterThan(0L));
}
for (ObjectCursor<DiskUsage> usage : mostUsages.values()) {
logger.info("--> usage: {}", usage.value);
assertThat("usage has be retrieved", usage.value.getFreeBytes(), greaterThan(0L));
for (DiskUsage usage : mostUsages.values()) {
logger.info("--> usage: {}", usage);
assertThat("usage has be retrieved", usage.getFreeBytes(), greaterThan(0L));
}
for (ObjectCursor<Long> size : shardSizes.values()) {
logger.info("--> shard size: {}", size.value);
assertThat("shard size is greater than 0", size.value, greaterThanOrEqualTo(0L));
for (Long size : shardSizes.values()) {
logger.info("--> shard size: {}", size);
assertThat("shard size is greater than 0", size, greaterThanOrEqualTo(0L));
}
for (ObjectCursor<Long> size : shardDataSetSizes.values()) {
assertThat("shard data set size is greater than 0", size.value, greaterThanOrEqualTo(0L));
for (Long size : shardDataSetSizes.values()) {
assertThat("shard data set size is greater than 0", size, greaterThanOrEqualTo(0L));
}

ClusterService clusterService = internalTestCluster.getInstance(ClusterService.class, internalTestCluster.getMasterName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.elasticsearch.cluster.routing;

import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.plugins.Plugin;
Expand All @@ -19,7 +20,6 @@
import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
Expand Down Expand Up @@ -53,9 +53,9 @@ public void testReplicaRemovalPriority() throws Exception {
});
}

final String dataNodeIdFilter = StreamSupport.stream(client().admin().cluster().prepareState().clear().setNodes(true).get()
.getState().nodes().getDataNodes().values().spliterator(), false)
.map(c -> c.value.getId()).limit(3).collect(Collectors.joining(","));
final String dataNodeIdFilter = client().admin().cluster().prepareState().clear().setNodes(true).get()
.getState().nodes().getDataNodes().values().stream()
.map(DiscoveryNode::getId).limit(3).collect(Collectors.joining(","));
final String excludedDataNodeId = dataNodeIdFilter.substring(0, dataNodeIdFilter.indexOf(','));

createIndex(INDEX_NAME, Settings.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.StreamSupport;

import static org.elasticsearch.index.store.Store.INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
Expand Down Expand Up @@ -206,8 +205,8 @@ private void refreshDiskUsage() {
ClusterInfoServiceUtils.refresh(((InternalClusterInfoService) clusterInfoService));
// if the nodes were all under the low watermark already (but unbalanced) then a change in the disk usage doesn't trigger a reroute
// even though it's now possible to achieve better balance, so we have to do an explicit reroute. TODO fix this?
if (StreamSupport.stream(clusterInfoService.getClusterInfo().getNodeMostAvailableDiskUsages().values().spliterator(), false)
.allMatch(cur -> cur.value.getFreeBytes() > WATERMARK_BYTES)) {
if (clusterInfoService.getClusterInfo().getNodeMostAvailableDiskUsages().values().stream()
.allMatch(e -> e.getFreeBytes() > WATERMARK_BYTES)) {
assertAcked(client().admin().cluster().prepareReroute());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.elasticsearch.gateway;

import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.elasticsearch.action.admin.cluster.configuration.AddVotingConfigExclusionsAction;
import org.elasticsearch.action.admin.cluster.configuration.AddVotingConfigExclusionsRequest;
import org.elasticsearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsAction;
Expand Down Expand Up @@ -133,8 +132,7 @@ private Map<String, long[]> assertAndCapturePrimaryTerms(Map<String, long[]> pre
}
final Map<String, long[]> result = new HashMap<>();
final ClusterState state = client().admin().cluster().prepareState().get().getState();
for (ObjectCursor<IndexMetadata> cursor : state.metadata().indices().values()) {
final IndexMetadata indexMetadata = cursor.value;
for (IndexMetadata indexMetadata : state.metadata().indices().values()) {
final String index = indexMetadata.getIndex().getName();
final long[] previous = previousTerms.get(index);
final long[] current = IntStream.range(0, indexMetadata.getNumberOfShards()).mapToLong(indexMetadata::primaryTerm).toArray();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void run() {
indexingThread.start();

ClusterState initialState = client().admin().cluster().prepareState().get().getState();
DiscoveryNode[] dataNodes = initialState.getNodes().getDataNodes().values().toArray(DiscoveryNode.class);
DiscoveryNode[] dataNodes = initialState.getNodes().getDataNodes().values().toArray(DiscoveryNode[]::new);
DiscoveryNode relocationSource = initialState.getNodes().getDataNodes().get(initialState.getRoutingTable()
.shardRoutingTable("test", 0).primaryShard().currentNodeId());
for (int i = 0; i < RELOCATION_COUNT; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
Expand Down Expand Up @@ -379,8 +378,8 @@ public void testNoopPeerRecoveriesWhenIndexClosed() throws Exception {
public void testRecoverExistingReplica() throws Exception {
final String indexName = "test-recover-existing-replica";
internalCluster().ensureAtLeastNumDataNodes(2);
List<String> dataNodes = randomSubsetOf(2, Sets.newHashSet(
clusterService().state().nodes().getDataNodes().valuesIt()).stream().map(DiscoveryNode::getName).collect(Collectors.toSet()));
List<String> dataNodes = randomSubsetOf(2, clusterService().state().nodes().getDataNodes().values()
.stream().map(DiscoveryNode::getName).collect(Collectors.toSet()));
createIndex(indexName, Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void testShrinking() throws Exception {
client().admin().indices().prepareUpdateSettings(index)
.setSettings(Settings.builder()
.put("index.routing.allocation.require._name", client().admin().cluster().prepareState().get().getState().nodes()
.getDataNodes().values().toArray(DiscoveryNode.class)[0].getName())
.getDataNodes().values().toArray(DiscoveryNode[]::new)[0].getName())
.put("index.blocks.write", true)).get();
ensureGreen();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
*/
package org.elasticsearch.snapshots;

import com.carrotsearch.hppc.cursors.ObjectCursor;

import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
Expand Down Expand Up @@ -684,8 +682,8 @@ public void testStartCloneDuringRunningDelete() throws Exception {
for (SnapshotsInProgress.Entry entry : state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).forRepo(repoName)) {
if (entry.shardsByRepoShardId().isEmpty() == false) {
assertEquals(sourceSnapshot, entry.source().getName());
for (ObjectCursor<SnapshotsInProgress.ShardSnapshotStatus> value : entry.shardsByRepoShardId().values()) {
assertSame(value.value, SnapshotsInProgress.ShardSnapshotStatus.UNASSIGNED_QUEUED);
for (SnapshotsInProgress.ShardSnapshotStatus value : entry.shardsByRepoShardId().values()) {
assertSame(value, SnapshotsInProgress.ShardSnapshotStatus.UNASSIGNED_QUEUED);
}
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
*/
package org.elasticsearch.snapshots;

import com.carrotsearch.hppc.cursors.ObjectCursor;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionFuture;
Expand Down Expand Up @@ -1996,8 +1994,8 @@ private void createIndexWithContent(String indexName, String nodeInclude, String
private static boolean snapshotHasCompletedShard(String repoName, String snapshot, SnapshotsInProgress snapshotsInProgress) {
for (SnapshotsInProgress.Entry entry : snapshotsInProgress.forRepo(repoName)) {
if (entry.snapshot().getSnapshotId().getName().equals(snapshot)) {
for (ObjectCursor<SnapshotsInProgress.ShardSnapshotStatus> shard : entry.shards().values()) {
if (shard.value.state().completed()) {
for (SnapshotsInProgress.ShardSnapshotStatus shard : entry.shards().values()) {
if (shard.state().completed()) {
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.elasticsearch.action.admin.cluster.snapshots.status;

import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;

import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -127,9 +126,9 @@ protected void masterOperation(

Set<String> nodesIds = new HashSet<>();
for (SnapshotsInProgress.Entry entry : currentSnapshots) {
for (ObjectCursor<SnapshotsInProgress.ShardSnapshotStatus> status : entry.shardsByRepoShardId().values()) {
if (status.value.nodeId() != null) {
nodesIds.add(status.value.nodeId());
for (SnapshotsInProgress.ShardSnapshotStatus status : entry.shardsByRepoShardId().values()) {
if (status.nodeId() != null) {
nodesIds.add(status.nodeId());
}
}
}
Expand Down
Loading

0 comments on commit cb73e3f

Please sign in to comment.