Skip to content
This repository has been archived by the owner on Apr 11, 2024. It is now read-only.

Commit

Permalink
Refactoring GatedAutoCloseable to AutoCloseableRefCounted
Browse files Browse the repository at this point in the history
This is a part of the process of merging our feature branch - feature/segment-replication - back into main by re-PRing our changes from the feature branch.
GatedAutoCloseable currently wraps a subclass of RefCounted. Segment replication adds another subclass, but this also wraps RefCounted. Both subclasses have the same shutdown hook - decRef. This change makes the superclass less generic to increase code convergence.

The breakdown of the plan to merge segment-replication to main is detailed in opensearch-project#2355
Segment replication design proposal - opensearch-project#2229

Signed-off-by: Kartik Ganesh <[email protected]>
  • Loading branch information
kartg committed Apr 18, 2022
1 parent ca102ea commit 8f0e5bc
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,19 @@

package org.opensearch.common.concurrent;

import org.opensearch.common.util.concurrent.RefCounted;

/**
* Decorator class that wraps an object reference with a {@link Runnable} that is
* invoked when {@link #close()} is called. The internal {@link OneWayGate} instance ensures
* that this is invoked only once. See also {@link GatedCloseable}
* Adapter class that enables a {@link RefCounted} implementation to function like an {@link AutoCloseable}.
* The {@link #close()} API invokes {@link RefCounted#decRef()} and ensures idempotency using a {@link OneWayGate}.
*/
public class GatedAutoCloseable<T> implements AutoCloseable {
public class AutoCloseableRefCounted<T extends RefCounted> implements AutoCloseable {

private final T ref;
private final Runnable onClose;
private final OneWayGate gate;

public GatedAutoCloseable(T ref, Runnable onClose) {
public AutoCloseableRefCounted(T ref) {
this.ref = ref;
this.onClose = onClose;
gate = new OneWayGate();
}

Expand All @@ -37,7 +36,7 @@ public T get() {
@Override
public void close() {
if (gate.close()) {
onClose.run();
ref.decRef();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
/**
* Decorator class that wraps an object reference with a {@link CheckedRunnable} that is
* invoked when {@link #close()} is called. The internal {@link OneWayGate} instance ensures
* that this is invoked only once. See also {@link GatedAutoCloseable}
* that this is invoked only once. See also {@link AutoCloseableRefCounted}
*/
public class GatedCloseable<T> implements Closeable {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.OpenSearchTimeoutException;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.concurrent.GatedAutoCloseable;
import org.opensearch.common.concurrent.AutoCloseableRefCounted;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.AbstractRunnable;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
Expand Down Expand Up @@ -273,14 +273,14 @@ public boolean cancelRecoveriesForShard(ShardId shardId, String reason) {
* causes {@link RecoveryTarget#decRef()} to be called. This makes sure that the underlying resources
* will not be freed until {@link RecoveryRef#close()} is called.
*/
public static class RecoveryRef extends GatedAutoCloseable<RecoveryTarget> {
public static class RecoveryRef extends AutoCloseableRefCounted<RecoveryTarget> {

/**
* Important: {@link RecoveryTarget#tryIncRef()} should
* be *successfully* called on status before
*/
public RecoveryRef(RecoveryTarget status) {
super(status, status::decRef);
super(status);
status.setLastAccessTime();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,33 +14,36 @@
package org.opensearch.common.concurrent;

import org.junit.Before;
import org.opensearch.common.util.concurrent.RefCounted;
import org.opensearch.test.OpenSearchTestCase;

import java.util.concurrent.atomic.AtomicInteger;
import static org.mockito.Mockito.atMostOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

public class GatedAutoCloseableTests extends OpenSearchTestCase {
public class AutoCloseableRefCountedTests extends OpenSearchTestCase {

private AtomicInteger testRef;
private GatedAutoCloseable<AtomicInteger> testObject;
private RefCounted mockRefCounted;
private AutoCloseableRefCounted<RefCounted> testObject;

@Before
public void setup() {
testRef = new AtomicInteger(0);
testObject = new GatedAutoCloseable<>(testRef, testRef::incrementAndGet);
mockRefCounted = mock(RefCounted.class);
testObject = new AutoCloseableRefCounted<>(mockRefCounted);
}

public void testGet() {
assertEquals(0, testObject.get().get());
assertEquals(mockRefCounted, testObject.get());
}

public void testClose() {
testObject.close();
assertEquals(1, testObject.get().get());
verify(mockRefCounted, atMostOnce()).decRef();
}

public void testIdempotent() {
testObject.close();
testObject.close();
assertEquals(1, testObject.get().get());
verify(mockRefCounted, atMostOnce()).decRef();
}
}

0 comments on commit 8f0e5bc

Please sign in to comment.