From 081195988c50c7d5cd67c9b697afc9e2da957df5 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Fri, 17 Jun 2022 12:40:06 -0700 Subject: [PATCH] PR Feedback: - Make FileChunkWriter a functional interface. - simplify logic in OngoingSegmentReplications.prepareForReplication. - Rename segmentReplicationSourceHandler.isActive to isReplicating. Signed-off-by: Marc Handalian --- .../org/opensearch/indices/recovery/FileChunkWriter.java | 1 + .../indices/replication/OngoingSegmentReplications.java | 9 +++++---- .../replication/SegmentReplicationSourceService.java | 1 - server/src/main/java/org/opensearch/node/Node.java | 3 +++ 4 files changed, 9 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/recovery/FileChunkWriter.java b/server/src/main/java/org/opensearch/indices/recovery/FileChunkWriter.java index 073442e9d58ae..cb43af3b82e09 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/FileChunkWriter.java +++ b/server/src/main/java/org/opensearch/indices/recovery/FileChunkWriter.java @@ -17,6 +17,7 @@ * * @opensearch.internal */ +@FunctionalInterface public interface FileChunkWriter { void writeFileChunk( diff --git a/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java b/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java index 679c56fbc79c3..6302d364fc6d1 100644 --- a/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java +++ b/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java @@ -145,14 +145,16 @@ void cancelReplication(DiscoveryNode node) { */ CopyState prepareForReplication(CheckpointInfoRequest request, FileChunkWriter fileChunkWriter) throws IOException { final CopyState copyState = getCachedCopyState(request.getCheckpoint()); - if (nodesToHandlers.containsKey(request.getTargetNode())) { + if (nodesToHandlers.putIfAbsent( + request.getTargetNode(), + createTargetHandler(request.getTargetNode(), copyState, fileChunkWriter) + ) != null) { throw new OpenSearchException( "Shard copy {} on node {} already replicating", request.getCheckpoint().getShardId(), request.getTargetNode() ); } - nodesToHandlers.computeIfAbsent(request.getTargetNode(), node -> createTargetHandler(node, copyState, fileChunkWriter)); return copyState; } @@ -221,8 +223,7 @@ private CopyState fetchFromCopyStateMap(ReplicationCheckpoint replicationCheckpo * @param copyState {@link CopyState} */ private synchronized void removeCopyState(CopyState copyState) { - copyState.decRef(); - if (copyState.refCount() <= 0) { + if (copyState.decRef() == true) { copyStateMap.remove(copyState.getRequestedReplicationCheckpoint()); } } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java index 227acbc83674d..d428459884f97 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java @@ -60,7 +60,6 @@ public static class Actions { private final OngoingSegmentReplications ongoingSegmentReplications; - // TODO mark this as injected and bind in Node public SegmentReplicationSourceService( IndicesService indicesService, TransportService transportService, diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 7e205b88e9eb1..b9ac0e610218b 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -37,6 +37,7 @@ import org.apache.lucene.util.Constants; import org.apache.lucene.util.SetOnce; import org.opensearch.index.IndexingPressureService; +import org.opensearch.indices.replication.SegmentReplicationSourceService; import org.opensearch.watcher.ResourceWatcherService; import org.opensearch.Assertions; import org.opensearch.Build; @@ -932,6 +933,8 @@ protected Node( .toInstance(new PeerRecoverySourceService(transportService, indicesService, recoverySettings)); b.bind(PeerRecoveryTargetService.class) .toInstance(new PeerRecoveryTargetService(threadPool, transportService, recoverySettings, clusterService)); + b.bind(SegmentReplicationSourceService.class) + .toInstance(new SegmentReplicationSourceService(indicesService, transportService, recoverySettings)); } b.bind(HttpServerTransport.class).toInstance(httpServerTransport); pluginComponents.stream().forEach(p -> b.bind((Class) p.getClass()).toInstance(p));