Skip to content

Commit

Permalink
[7.x] Add support of getting a Java stream on ImmutableOpenMap (elast…
Browse files Browse the repository at this point in the history
…ic#76921)

`ImmutableOpenMap` doesn't support Java streams directly, so consumer have
to invoke `StreamSupport.spliterator` on it if they want to run a Java
stream over it. We can simplify consumer code by providing a direct
`stream` method in `ImmutableOpenMap`
  • Loading branch information
arteam committed Sep 8, 2021
1 parent 609bc82 commit cbcbf33
Show file tree
Hide file tree
Showing 14 changed files with 157 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFailures;
Expand Down Expand Up @@ -159,8 +158,8 @@ public void testRelocation() throws Exception {
assertNoFailures(resp);
assertHitCount(resp, numDocs);
assertThat(resp.pointInTimeId(), equalTo(pitId));
final Set<String> dataNodes = StreamSupport.stream(clusterService().state().nodes().getDataNodes().spliterator(), false)
.map(e -> e.value.getId()).collect(Collectors.toSet());
final Set<String> dataNodes = clusterService().state().nodes().getDataNodes().stream()
.map(e -> e.getValue().getId()).collect(Collectors.toSet());
final List<String> excludedNodes = randomSubsetOf(2, dataNodes);
assertAcked(client().admin().indices().prepareUpdateSettings("test")
.setSettings(Settings.builder().put("index.routing.allocation.exclude._id", String.join(",", excludedNodes)).build()));
Expand Down Expand Up @@ -311,9 +310,8 @@ public void testCanMatch() throws Exception {

public void testPartialResults() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(2);
final List<String> dataNodes =
StreamSupport.stream(internalCluster().clusterService().state().nodes().getDataNodes().spliterator(), false)
.map(e -> e.value.getName())
final List<String> dataNodes = internalCluster().clusterService().state().nodes().getDataNodes().stream()
.map(e -> e.getValue().getName())
.collect(Collectors.toList());
final String assignedNodeForIndex1 = randomFrom(dataNodes);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,10 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.gateway.ReplicaShardAllocatorIT;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
Expand Down Expand Up @@ -1343,8 +1344,8 @@ public Settings onNodeStopped(String nodeName) throws Exception {

public void testRecoverLocallyUpToGlobalCheckpoint() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(2);
List<String> nodes = randomSubsetOf(2, StreamSupport.stream(clusterService().state().nodes().getDataNodes().spliterator(), false)
.map(node -> node.value.getName()).collect(Collectors.toSet()));
List<String> nodes = randomSubsetOf(2, clusterService().state().nodes().getDataNodes().stream()
.map(node -> node.getValue().getName()).collect(Collectors.toSet()));
String indexName = "test-index";
createIndex(indexName, Settings.builder()
.put("index.number_of_shards", 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

/**
* Get index action.
Expand Down Expand Up @@ -63,8 +63,9 @@ protected void doMasterOperation(Task task, final GetIndexRequest request, Strin
ImmutableOpenMap<String, Settings> settings = ImmutableOpenMap.of();
ImmutableOpenMap<String, Settings> defaultSettings = ImmutableOpenMap.of();
ImmutableOpenMap<String, String> dataStreams = ImmutableOpenMap.<String, String>builder()
.putAll(StreamSupport.stream(state.metadata().findDataStreams(concreteIndices).spliterator(), false)
.collect(Collectors.toMap(k -> k.key, v -> v.value.getName()))).build();
.putAll(state.metadata().findDataStreams(concreteIndices).stream()
.collect(Collectors.toMap(Map.Entry::getKey, v -> v.getValue().getName())))
.build();
Feature[] features = request.features();
boolean doneAliases = false;
boolean doneMappings = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.StreamSupport;

import static org.elasticsearch.cluster.coordination.Coordinator.ZEN1_BWC_TERM;

Expand Down Expand Up @@ -649,7 +648,7 @@ public Builder removeCustom(String type) {
}

public Builder customs(ImmutableOpenMap<String, Custom> customs) {
StreamSupport.stream(customs.spliterator(), false).forEach(cursor -> Objects.requireNonNull(cursor.value, cursor.key));
customs.stream().forEach(entry -> Objects.requireNonNull(entry.getValue(), entry.getKey()));
this.customs.putAll(customs);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.lucene.util.CollectionUtil;
Expand Down Expand Up @@ -74,7 +75,6 @@
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.elasticsearch.common.settings.Settings.readSettingsFromStream;
import static org.elasticsearch.common.settings.Settings.writeSettingsToStream;
Expand Down Expand Up @@ -1436,7 +1436,7 @@ public Builder removeCustom(String type) {
}

public Builder customs(ImmutableOpenMap<String, Custom> customs) {
StreamSupport.stream(customs.spliterator(), false).forEach(cursor -> Objects.requireNonNull(cursor.value, cursor.key));
customs.stream().forEach(entry -> Objects.requireNonNull(entry.getValue(), entry.getKey()));
this.customs.putAll(customs);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,18 @@
import com.carrotsearch.hppc.ObjectHashSet;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.AbstractDiffable;
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.core.Booleans;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.core.Booleans;
import org.elasticsearch.core.Nullable;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -169,7 +170,8 @@ public Collection<DiscoveryNode> getAllNodes() {
* Returns a stream of all nodes, with master nodes at the front
*/
public Stream<DiscoveryNode> mastersFirstStream() {
return Stream.concat(StreamSupport.stream(masterNodes.spliterator(), false).map(cur -> cur.value),
return Stream.concat(
masterNodes.stream().map(Map.Entry::getValue),
StreamSupport.stream(this.spliterator(), false).filter(n -> n.isMasterNode() == false));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import com.carrotsearch.hppc.ObjectLookupContainer;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
Expand Down Expand Up @@ -41,13 +42,13 @@
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

/**
* Listens for a node to go over the high watermark and kicks off an empty
Expand Down Expand Up @@ -307,9 +308,8 @@ public void onNewInfo(ClusterInfo info) {
logger.trace("no reroute required");
listener.onResponse(null);
}
final Set<String> indicesToAutoRelease = StreamSupport.stream(state.routingTable().indicesRouting()
.spliterator(), false)
.map(c -> c.key)
final Set<String> indicesToAutoRelease = state.routingTable().indicesRouting().stream()
.map(Map.Entry::getKey)
.filter(index -> indicesNotToAutoRelease.contains(index) == false)
.filter(index -> state.getBlocks().hasIndexBlock(index, IndexMetadata.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK))
.collect(Collectors.toSet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,14 @@
import com.carrotsearch.hppc.predicates.ObjectPredicate;
import com.carrotsearch.hppc.procedures.ObjectObjectProcedure;

import java.util.AbstractMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

/**
* An immutable map implementation based on open hash map.
Expand All @@ -36,7 +42,6 @@ public final class ImmutableOpenMap<KType, VType> implements Iterable<ObjectObje
private ImmutableOpenMap(ObjectObjectHashMap<KType, VType> map) {
this.map = map;
}

/**
* @return Returns the value associated with the given key or the default value
* for the key type, if the key is not associated with any value.
Expand Down Expand Up @@ -163,6 +168,27 @@ public void remove() {
};
}

/**
* Returns a sequential unordered stream of the map entries.
*
* @return a {@link Stream} of the map entries as {@link Map.Entry}
*/
public Stream<Map.Entry<KType, VType>> stream() {
final Iterator<ObjectObjectCursor<KType, VType>> mapIterator = map.iterator();
return StreamSupport.stream(new Spliterators.AbstractSpliterator<Map.Entry<KType, VType>>(map.size(),
Spliterator.SIZED | Spliterator.DISTINCT) {
@Override
public boolean tryAdvance(Consumer<? super Map.Entry<KType, VType>> action) {
if (mapIterator.hasNext() == false) {
return false;
}
ObjectObjectCursor<KType, VType> cursor = mapIterator.next();
action.accept(new AbstractMap.SimpleImmutableEntry<>(cursor.key, cursor.value));
return true;
}
}, false);
}

@Override
public String toString() {
return map.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.gateway.AsyncShardFetch.Lister;
import org.elasticsearch.gateway.TransportNodesListGatewayStartedShards.NodeGatewayStartedShards;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetadata;

Expand All @@ -37,7 +39,6 @@
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

public class GatewayAllocator implements ExistingShardsAllocator {

Expand Down Expand Up @@ -165,8 +166,8 @@ public AllocateUnassignedDecision explainUnassignedShardAllocation(ShardRouting
private void ensureAsyncFetchStorePrimaryRecency(RoutingAllocation allocation) {
DiscoveryNodes nodes = allocation.nodes();
if (hasNewNodes(nodes)) {
final Set<String> newEphemeralIds = StreamSupport.stream(nodes.getDataNodes().spliterator(), false)
.map(node -> node.value.getEphemeralId()).collect(Collectors.toSet());
final Set<String> newEphemeralIds = nodes.getDataNodes().stream()
.map(node -> node.getValue().getEphemeralId()).collect(Collectors.toSet());
// Invalidate the cache if a data node has been added to the cluster. This ensures that we do not cancel a recovery if a node
// drops out, we fetch the shard data, then some indexing happens and then the node rejoins the cluster again. There are other
// ways we could decide to cancel a recovery based on stale data (e.g. changing allocation filters or a primary failure) but
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,8 @@ private GroupedActionListener<ActionResponse> createGroupedListener(final RestRe
public void onResponse(final Collection<ActionResponse> responses) {
try {
GetSettingsResponse settingsResponse = extractResponse(responses, GetSettingsResponse.class);
Map<String, Settings> indicesSettings = StreamSupport.stream(settingsResponse.getIndexToSettings().spliterator(), false)
.collect(Collectors.toMap(cursor -> cursor.key, cursor -> cursor.value));
Map<String, Settings> indicesSettings = settingsResponse.getIndexToSettings().stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

ClusterStateResponse stateResponse = extractResponse(responses, ClusterStateResponse.class);
Map<String, IndexMetadata> indicesStates =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.common.collect;

import com.carrotsearch.hppc.cursors.ObjectObjectCursor;

import org.elasticsearch.common.Randomness;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.test.ESTestCase;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;

import static org.hamcrest.Matchers.equalTo;

public class ImmutableOpenMapTests extends ESTestCase {

ImmutableOpenMap<String, String> regionCurrencySymbols = ImmutableOpenMap.<String, String>builder()
.fPut("Japan", "¥")
.fPut("USA", "$")
.fPut("EU", "€")
.fPut("UK", "£")
.fPut("Korea", "₩")
.build();

public void testStreamOperationsAreSupported() {
assertThat(regionCurrencySymbols.stream().filter(e -> e.getKey().startsWith("U")).map(Map.Entry::getValue)
.collect(Collectors.toSet()), equalTo(Set.of("£", "$")));
}

public void testSortedStream() {
assertThat(regionCurrencySymbols.stream().sorted(Map.Entry.comparingByKey()).map(Map.Entry::getValue).collect(Collectors.toList()),
equalTo(List.of("€", "¥", "₩", "£", "$")));
}

public void testStreamOperationsOnRandomMap() {
ImmutableOpenMap<Long, String> map = Randomness.get().longs(randomIntBetween(1, 1000))
.mapToObj(e -> Tuple.tuple(e, randomAlphaOfLength(8)))
.collect(() -> ImmutableOpenMap.<Long, String>builder(), (builder, t) -> builder.fPut(t.v1(), t.v2()),
ImmutableOpenMap.Builder::putAll)
.build();

int limit = randomIntBetween(0, map.size());
Map<Long, List<String>> collectedViaStreams = map.stream()
.filter(e -> e.getKey() > 0)
.sorted(Map.Entry.comparingByKey())
.limit(limit)
.collect(Collectors.groupingBy(e -> e.getKey() % 2, Collectors.mapping(Map.Entry::getValue, Collectors.toList())));

Map<Long, String> sortedMap = new TreeMap<>();
for (ObjectObjectCursor<Long, String> cursor : map) {
if (cursor.key > 0) {
sortedMap.put(cursor.key, cursor.value);
}
}
int i = 0;
Map<Long, List<String>> collectedIteratively = new HashMap<>();
for (Map.Entry<Long, String> e : sortedMap.entrySet()) {
if (i++ >= limit) {
break;
}
collectedIteratively.computeIfAbsent(e.getKey() % 2, k -> new ArrayList<>()).add(e.getValue());
}

assertThat(collectedViaStreams, equalTo(collectedIteratively));
}

public void testEmptyStreamWorks() {
assertThat(ImmutableOpenMap.of().stream().count(), equalTo(0L));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_ROUTING_EXCLUDE_GROUP_SETTING;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
Expand Down Expand Up @@ -217,10 +216,13 @@ public void testTimestampFieldTypeExposedByAllIndicesServices() throws Exception

public void testRetryPointInTime() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(1);
final List<String> dataNodes = StreamSupport.stream(
internalCluster().clusterService().state().nodes().getDataNodes().spliterator(),
false
).map(e -> e.value.getName()).collect(Collectors.toList());
final List<String> dataNodes = internalCluster().clusterService()
.state()
.nodes()
.getDataNodes()
.stream()
.map(e -> e.getValue().getName())
.collect(Collectors.toList());
final String assignedNode = randomFrom(dataNodes);
final String indexName = "test";
assertAcked(
Expand Down
Loading

0 comments on commit cbcbf33

Please sign in to comment.