Skip to content

Commit

Permalink
Use computeIfAbsent
Browse files Browse the repository at this point in the history
  • Loading branch information
DaveCTurner committed Sep 7, 2021
1 parent 7ca5255 commit 13bfe0c
Showing 1 changed file with 33 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.elasticsearch.cluster.IncompatibleClusterStateVersionException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.compress.Compressor;
import org.elasticsearch.common.compress.CompressorFactory;
Expand All @@ -35,6 +34,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.LazyInitializable;
import org.elasticsearch.core.AbstractRefCounted;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.internal.io.IOUtils;
Expand Down Expand Up @@ -202,7 +202,8 @@ public PublicationContext newPublicationContext(ClusterStatePublicationEvent clu
}
}

private ReleasableBytesReference serializeFullClusterState(ClusterState clusterState, Version nodeVersion) throws IOException {
private ReleasableBytesReference serializeFullClusterState(ClusterState clusterState, DiscoveryNode node) {
final Version nodeVersion = node.getVersion();
final BytesStreamOutput bytesStream = new ReleasableBytesStreamOutput(bigArrays);
boolean success = false;
try {
Expand All @@ -212,14 +213,15 @@ private ReleasableBytesReference serializeFullClusterState(ClusterState clusterS
stream.setVersion(nodeVersion);
stream.writeBoolean(true);
clusterState.writeTo(stream);
} catch (IOException e) {
throw new ElasticsearchException("failed to serialize cluster state for publishing to node {}", e, node);
}
final BytesReference serializedState = bytesStream.bytes();
final ReleasableBytesReference result = new ReleasableBytesReference(bytesStream.bytes(), bytesStream::close);
logger.trace(
"serialized full cluster state version [{}] for node version [{}] with size [{}]",
clusterState.version(),
nodeVersion,
serializedState.length());
final ReleasableBytesReference result = new ReleasableBytesReference(serializedState, bytesStream::close);
result.length());
success = true;
return result;
} finally {
Expand All @@ -229,23 +231,31 @@ private ReleasableBytesReference serializeFullClusterState(ClusterState clusterS
}
}

private ReleasableBytesReference serializeDiffClusterState(Diff<ClusterState> diff, Version nodeVersion) throws IOException {
final BytesStreamOutput bStream = new ReleasableBytesStreamOutput(bigArrays);
private ReleasableBytesReference serializeDiffClusterState(long clusterStateVersion, Diff<ClusterState> diff, DiscoveryNode node) {
final Version nodeVersion = node.getVersion();
final BytesStreamOutput bytesStream = new ReleasableBytesStreamOutput(bigArrays);
boolean success = false;
try {
try (StreamOutput stream = new OutputStreamStreamOutput(
CompressorFactory.COMPRESSOR.threadLocalOutputStream(Streams.flushOnCloseStream(bStream)))
CompressorFactory.COMPRESSOR.threadLocalOutputStream(Streams.flushOnCloseStream(bytesStream)))
) {
stream.setVersion(nodeVersion);
stream.writeBoolean(false);
diff.writeTo(stream);
} catch (IOException e) {
throw new ElasticsearchException("failed to serialize cluster state diff for publishing to node {}", e, node);
}
final ReleasableBytesReference result = new ReleasableBytesReference(bStream.bytes(), bStream::close);
final ReleasableBytesReference result = new ReleasableBytesReference(bytesStream.bytes(), bytesStream::close);
logger.trace(
"serialized cluster state diff for version [{}] for node version [{}] with size [{}]",
clusterStateVersion,
nodeVersion,
result.length());
success = true;
return result;
} finally {
if (success == false) {
bStream.close();
bytesStream.close();
}
}
}
Expand Down Expand Up @@ -277,27 +287,17 @@ public class PublicationContext extends AbstractRefCounted {

void buildDiffAndSerializeStates() {
assert refCount() > 0;
Diff<ClusterState> diff = null;
final LazyInitializable<Diff<ClusterState>, RuntimeException> diffSupplier
= new LazyInitializable<>(() -> newState.diff(previousState));
for (DiscoveryNode node : discoveryNodes) {
try {
if (sendFullVersion || previousState.nodes().nodeExists(node) == false) {
if (serializedStates.containsKey(node.getVersion()) == false) {
serializedStates.put(node.getVersion(), serializeFullClusterState(newState, node.getVersion()));
}
} else {
// will send a diff
if (diff == null) {
diff = newState.diff(previousState);
}
if (serializedDiffs.containsKey(node.getVersion()) == false) {
final ReleasableBytesReference serializedDiff = serializeDiffClusterState(diff, node.getVersion());
serializedDiffs.put(node.getVersion(), serializedDiff);
logger.trace("serialized cluster state diff for version [{}] in for node version [{}] with size [{}]",
newState.version(), node.getVersion(), serializedDiff.length());
}
}
} catch (IOException e) {
throw new ElasticsearchException("failed to serialize cluster state for publishing to node {}", e, node);
if (sendFullVersion || previousState.nodes().nodeExists(node) == false) {
serializedStates.computeIfAbsent(
node.getVersion(),
v -> serializeFullClusterState(newState, node));
} else {
serializedDiffs.computeIfAbsent(
node.getVersion(),
v -> serializeDiffClusterState(newState.version(), diffSupplier.getOrCompute(), node));
}
}
}
Expand Down Expand Up @@ -355,13 +355,9 @@ private void sendFullClusterState(DiscoveryNode destination, ActionListener<Publ
ReleasableBytesReference bytes = serializedStates.get(destination.getVersion());
if (bytes == null) {
try {
bytes = serializeFullClusterState(newState, destination.getVersion());
final ReleasableBytesReference existingBytes = serializedStates.putIfAbsent(destination.getVersion(), bytes);
if (existingBytes != null) {
assert existingBytes.refCount() > 0;
bytes.decRef();
bytes = existingBytes;
}
bytes = serializedStates.computeIfAbsent(
destination.getVersion(),
v -> serializeFullClusterState(newState, destination));
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage(
"failed to serialize cluster state before publishing it to node {}", destination), e);
Expand Down

0 comments on commit 13bfe0c

Please sign in to comment.