From 998808900cf09ae1539cca2cce12a8c89d52f9d9 Mon Sep 17 00:00:00 2001 From: Kartik Ganesh Date: Tue, 15 Mar 2022 16:25:56 -0700 Subject: [PATCH] Changing AutoCloseableRefCounted to a generic class This is in response to PR feedback, and helps avoid a verbose get() method by introducing the generic at the class level. Signed-off-by: Kartik Ganesh --- .../concurrent/AutoCloseableRefCounted.java | 10 ++++----- .../recovery/PeerRecoveryTargetService.java | 22 +++++++++---------- .../recovery/RecoveriesCollection.java | 4 ++-- .../SegmentReplicationReplicaService.java | 4 ++-- .../copy/PrimaryShardReplicationSource.java | 4 ++-- .../copy/ReplicationCollection.java | 4 ++-- .../AutoCloseableRefCountedTests.java | 6 ++--- .../recovery/RecoveriesCollectionTests.java | 12 ++++------ 8 files changed, 31 insertions(+), 35 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/concurrent/AutoCloseableRefCounted.java b/server/src/main/java/org/opensearch/common/concurrent/AutoCloseableRefCounted.java index 86bcdd3838e23..795d352542881 100644 --- a/server/src/main/java/org/opensearch/common/concurrent/AutoCloseableRefCounted.java +++ b/server/src/main/java/org/opensearch/common/concurrent/AutoCloseableRefCounted.java @@ -19,18 +19,18 @@ * Adapter class that enables a {@link RefCounted} implementation to function like an {@link AutoCloseable}. * The {@link #close()} API invokes {@link RefCounted#decRef()} and ensures idempotency using a {@link OneWayGate}. */ -public class AutoCloseableRefCounted implements AutoCloseable { +public class AutoCloseableRefCounted implements AutoCloseable { - private final RefCounted ref; + private final T ref; private final OneWayGate gate; - public AutoCloseableRefCounted(RefCounted ref) { + public AutoCloseableRefCounted(T ref) { this.ref = ref; gate = new OneWayGate(); } - public T get(Class returnType) { - return (T) ref; + public T get() { + return ref; } @Override diff --git a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java index bf73aa2692e0c..96a9599cbd822 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java @@ -225,7 +225,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi logger.trace("not running recovery with id [{}] - can not find it (probably finished)", recoveryId); return; } - final RecoveryTarget recoveryTarget = recoveryRef.get(RecoveryTarget.class); + final RecoveryTarget recoveryTarget = recoveryRef.get(); timer = recoveryTarget.state().getTimer(); cancellableThreads = recoveryTarget.cancellableThreads(); if (preExistingRequest == null) { @@ -377,7 +377,7 @@ public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, if (listener == null) { return; } - recoveryRef.get(RecoveryTarget.class).prepareForTranslogOperations(request.totalTranslogOps(), listener); + recoveryRef.get().prepareForTranslogOperations(request.totalTranslogOps(), listener); } } } @@ -391,7 +391,7 @@ public void messageReceived(RecoveryFinalizeRecoveryRequest request, TransportCh if (listener == null) { return; } - recoveryRef.get(RecoveryTarget.class).finalizeRecovery(request.globalCheckpoint(), request.trimAboveSeqNo(), listener); + recoveryRef.get().finalizeRecovery(request.globalCheckpoint(), request.trimAboveSeqNo(), listener); } } } @@ -402,7 +402,7 @@ class HandoffPrimaryContextRequestHandler implements TransportRequestHandler listener = createOrFinishListener( recoveryRef, channel, @@ -436,7 +436,7 @@ private void performTranslogOps( final ActionListener listener, final RecoveryRef recoveryRef ) { - final RecoveryTarget recoveryTarget = recoveryRef.get(RecoveryTarget.class); + final RecoveryTarget recoveryTarget = recoveryRef.get(); final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext()); final Consumer retryOnMappingException = exception -> { @@ -501,7 +501,7 @@ public void messageReceived(RecoveryFilesInfoRequest request, TransportChannel c return; } - recoveryRef.get(RecoveryTarget.class) + recoveryRef.get() .receiveFileInfo( request.phase1FileNames, request.phase1FileSizes, @@ -524,7 +524,7 @@ public void messageReceived(RecoveryCleanFilesRequest request, TransportChannel return; } - recoveryRef.get(RecoveryTarget.class) + recoveryRef.get() .cleanFiles(request.totalTranslogOps(), request.getGlobalCheckpoint(), request.sourceMetaSnapshot(), listener); } } @@ -538,7 +538,7 @@ class FileChunkTransportRequestHandler implements TransportRequestHandler listener = createOrFinishListener(recoveryRef, channel, Actions.FILE_CHUNK, request); if (listener == null) { return; @@ -588,7 +588,7 @@ private ActionListener createOrFinishListener( final RecoveryTransportRequest request, final CheckedFunction responseFn ) { - final RecoveryTarget recoveryTarget = recoveryRef.get(RecoveryTarget.class); + final RecoveryTarget recoveryTarget = recoveryRef.get(); final ActionListener channelListener = new ChannelActionListener<>(channel, action, request); final ActionListener voidListener = ActionListener.map(channelListener, responseFn); @@ -622,7 +622,7 @@ public void onFailure(Exception e) { try (RecoveryRef recoveryRef = onGoingRecoveries.getRecovery(recoveryId)) { if (recoveryRef != null) { logger.error(() -> new ParameterizedMessage("unexpected error during recovery [{}], failing shard", recoveryId), e); - RecoveryTarget recoveryTarget = recoveryRef.get(RecoveryTarget.class); + RecoveryTarget recoveryTarget = recoveryRef.get(); onGoingRecoveries.failRecovery( recoveryId, new RecoveryFailedException(recoveryTarget.state(), "unexpected error", e), diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoveriesCollection.java b/server/src/main/java/org/opensearch/indices/recovery/RecoveriesCollection.java index 569970471611d..531782962d98b 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoveriesCollection.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoveriesCollection.java @@ -183,7 +183,7 @@ public RecoveryRef getRecoverySafe(long id, ShardId shardId) { if (recoveryRef == null) { throw new IndexShardClosedException(shardId); } - RecoveryTarget recoveryTarget = recoveryRef.get(RecoveryTarget.class); + RecoveryTarget recoveryTarget = recoveryRef.get(); assert recoveryTarget.indexShard().shardId().equals(shardId); return recoveryRef; } @@ -284,7 +284,7 @@ public boolean cancelRecoveriesForShard(ShardId shardId, String reason) { * causes {@link RecoveryTarget#decRef()} to be called. This makes sure that the underlying resources * will not be freed until {@link RecoveryRef#close()} is called. */ - public static class RecoveryRef extends AutoCloseableRefCounted { + public static class RecoveryRef extends AutoCloseableRefCounted { /** * Important: {@link RecoveryTarget#tryIncRef()} should diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationReplicaService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationReplicaService.java index 1d3604edd6d15..b1334818e3b23 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationReplicaService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationReplicaService.java @@ -181,7 +181,7 @@ private void doReplication(final long replicationId) { logger.trace("not running replication with id [{}] - can not find it (probably finished)", replicationId); return; } - final SegmentReplicationTarget replicationTarget = replicationRef.get(SegmentReplicationTarget.class); + final SegmentReplicationTarget replicationTarget = replicationRef.get(); timer = replicationTarget.state().getTimer(); final IndexShard indexShard = replicationTarget.indexShard(); @@ -218,7 +218,7 @@ public void onFailure(Exception e) { () -> new ParameterizedMessage("unexpected error during replication [{}], failing shard", replicationId), e ); - SegmentReplicationTarget replicationTarget = replicationRef.get(SegmentReplicationTarget.class); + SegmentReplicationTarget replicationTarget = replicationRef.get(); onGoingReplications.failReplication( replicationId, new ReplicationFailedException(replicationTarget.indexShard(), "unexpected error", e), diff --git a/server/src/main/java/org/opensearch/indices/replication/copy/PrimaryShardReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/copy/PrimaryShardReplicationSource.java index d51cfad35145b..8083b76f910f0 100644 --- a/server/src/main/java/org/opensearch/indices/replication/copy/PrimaryShardReplicationSource.java +++ b/server/src/main/java/org/opensearch/indices/replication/copy/PrimaryShardReplicationSource.java @@ -231,7 +231,7 @@ public void messageReceived(final ReplicationFileChunkRequest request, Transport ReplicationCollection.ReplicationRef replicationRef = segmentReplicationReplicaService.getOnGoingReplications() .getReplicationSafe(request.getReplicationId(), request.shardId()) ) { - final SegmentReplicationTarget replicationTarget = replicationRef.get(SegmentReplicationTarget.class); + final SegmentReplicationTarget replicationTarget = replicationRef.get(); final ActionListener listener = createOrFinishListener(replicationRef, channel, Actions.FILE_CHUNK, request); if (listener == null) { return; @@ -274,7 +274,7 @@ private ActionListener createOrFinishListener( final ReplicationFileChunkRequest request, final CheckedFunction responseFn ) { - final SegmentReplicationTarget replicationTarget = replicationRef.get(SegmentReplicationTarget.class); + final SegmentReplicationTarget replicationTarget = replicationRef.get(); final ActionListener channelListener = new ChannelActionListener<>(channel, action, request); final ActionListener voidListener = ActionListener.map(channelListener, responseFn); diff --git a/server/src/main/java/org/opensearch/indices/replication/copy/ReplicationCollection.java b/server/src/main/java/org/opensearch/indices/replication/copy/ReplicationCollection.java index 76094cd1672af..02b1f6605beac 100644 --- a/server/src/main/java/org/opensearch/indices/replication/copy/ReplicationCollection.java +++ b/server/src/main/java/org/opensearch/indices/replication/copy/ReplicationCollection.java @@ -121,7 +121,7 @@ public ReplicationRef getReplicationSafe(long id, ShardId shardId) { if (replicationRef == null) { throw new IndexShardClosedException(shardId); } - SegmentReplicationTarget replicationTarget = replicationRef.get(SegmentReplicationTarget.class); + SegmentReplicationTarget replicationTarget = replicationRef.get(); assert replicationTarget.indexShard().shardId().equals(shardId); return replicationRef; } @@ -208,7 +208,7 @@ public boolean cancelRecoveriesForShard(ShardId shardId, String reason) { * causes {@link SegmentReplicationTarget#decRef()} to be called. This makes sure that the underlying resources * will not be freed until {@link ReplicationRef#close()} is called. */ - public static class ReplicationRef extends AutoCloseableRefCounted { + public static class ReplicationRef extends AutoCloseableRefCounted { /** * Important: {@link SegmentReplicationTarget#tryIncRef()} should diff --git a/server/src/test/java/org/opensearch/common/concurrent/AutoCloseableRefCountedTests.java b/server/src/test/java/org/opensearch/common/concurrent/AutoCloseableRefCountedTests.java index d0604680e5443..344368988f5ff 100644 --- a/server/src/test/java/org/opensearch/common/concurrent/AutoCloseableRefCountedTests.java +++ b/server/src/test/java/org/opensearch/common/concurrent/AutoCloseableRefCountedTests.java @@ -24,16 +24,16 @@ public class AutoCloseableRefCountedTests extends OpenSearchTestCase { private RefCounted mockRefCounted; - private AutoCloseableRefCounted testObject; + private AutoCloseableRefCounted testObject; @Before public void setup() { mockRefCounted = mock(RefCounted.class); - testObject = new AutoCloseableRefCounted(mockRefCounted); + testObject = new AutoCloseableRefCounted<>(mockRefCounted); } public void testGet() { - assertEquals(mockRefCounted, testObject.get(RefCounted.class)); + assertEquals(mockRefCounted, testObject.get()); } public void testClose() { diff --git a/server/src/test/java/org/opensearch/recovery/RecoveriesCollectionTests.java b/server/src/test/java/org/opensearch/recovery/RecoveriesCollectionTests.java index e6be95a1eba58..86edf15d42e68 100644 --- a/server/src/test/java/org/opensearch/recovery/RecoveriesCollectionTests.java +++ b/server/src/test/java/org/opensearch/recovery/RecoveriesCollectionTests.java @@ -69,14 +69,10 @@ public void testLastAccessTimeUpdate() throws Exception { final RecoveriesCollection collection = new RecoveriesCollection(logger, threadPool); final long recoveryId = startRecovery(collection, shards.getPrimaryNode(), shards.addReplica()); try (RecoveriesCollection.RecoveryRef status = collection.getRecovery(recoveryId)) { - final long lastSeenTime = status.get(RecoveryTarget.class).lastAccessTime(); + final long lastSeenTime = status.get().lastAccessTime(); assertBusy(() -> { try (RecoveriesCollection.RecoveryRef currentStatus = collection.getRecovery(recoveryId)) { - assertThat( - "access time failed to update", - lastSeenTime, - lessThan(currentStatus.get(RecoveryTarget.class).lastAccessTime()) - ); + assertThat("access time failed to update", lastSeenTime, lessThan(currentStatus.get().lastAccessTime())); } }); } finally { @@ -124,7 +120,7 @@ public void testRecoveryCancellation() throws Exception { final long recoveryId = startRecovery(collection, shards.getPrimaryNode(), shards.addReplica()); final long recoveryId2 = startRecovery(collection, shards.getPrimaryNode(), shards.addReplica()); try (RecoveriesCollection.RecoveryRef recoveryRef = collection.getRecovery(recoveryId)) { - ShardId shardId = recoveryRef.get(RecoveryTarget.class).indexShard().shardId(); + ShardId shardId = recoveryRef.get().indexShard().shardId(); assertTrue("failed to cancel recoveries", collection.cancelRecoveriesForShard(shardId, "test")); assertThat("all recoveries should be cancelled", collection.size(), equalTo(0)); } finally { @@ -164,7 +160,7 @@ public void testResetRecovery() throws Exception { assertEquals(currentAsTarget, shard.recoveryStats().currentAsTarget()); try (RecoveriesCollection.RecoveryRef newRecoveryRef = collection.getRecovery(resetRecoveryId)) { shards.recoverReplica(shard, (s, n) -> { - RecoveryTarget newRecoveryTarget = newRecoveryRef.get(RecoveryTarget.class); + RecoveryTarget newRecoveryTarget = newRecoveryRef.get(); assertSame(s, newRecoveryTarget.indexShard()); return newRecoveryTarget; }, false);