Skip to content

Commit

Permalink
Cherry Pick #1227 to Branch 3.0.x -> Fixing bug : Avoid registering s…
Browse files Browse the repository at this point in the history
…ubscriber class multiple times (#1230) (#1257)



Co-authored-by: Gul Jain <[email protected]>
  • Loading branch information
arunkumarchacko and guljain authored Oct 9, 2024
1 parent 634d9ba commit c030979
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,40 @@
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.client.http.HttpResponseException;
import com.google.cloud.hadoop.util.GcsRequestExecutionEvent;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.eventbus.Subscribe;
import com.google.common.flogger.GoogleLogger;
import io.grpc.Status;
import java.io.IOException;
import javax.annotation.Nonnull;

/* Stores the subscriber methods corresponding to GoogleCloudStorageEventBus */
public class GoogleCloudStorageEventSubscriber {
private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();
private static GhfsGlobalStorageStatistics storageStatistics;
private static GoogleCloudStorageEventSubscriber INSTANCE = null;

public GoogleCloudStorageEventSubscriber(GhfsGlobalStorageStatistics storageStatistics) {
private GoogleCloudStorageEventSubscriber(GhfsGlobalStorageStatistics storageStatistics) {
this.storageStatistics = storageStatistics;
}

/*
* Singleton class such that registration of subscriber methods is only once.
* */
public static synchronized GoogleCloudStorageEventSubscriber getInstance(
@Nonnull GhfsGlobalStorageStatistics storageStatistics) {
if (INSTANCE == null) {
logger.atFiner().log("Subscriber class invoked for first time");
INSTANCE = new GoogleCloudStorageEventSubscriber(storageStatistics);
}
return INSTANCE;
}

@VisibleForTesting
protected static void reset() {
INSTANCE = null;
}

/**
* Updating the required gcs specific statistics based on HttpResponseException.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ public GoogleHadoopFileSystem() {
}

GoogleCloudStorageEventBus.register(
new GoogleCloudStorageEventSubscriber(globalStorageStatistics));
GoogleCloudStorageEventSubscriber.getInstance(globalStorageStatistics));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class GoogleCloudStorageStatisticsTest {
private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();
private GhfsGlobalStorageStatistics storageStatistics = new GhfsGlobalStorageStatistics();
protected GoogleCloudStorageEventSubscriber subscriber =
new GoogleCloudStorageEventSubscriber(storageStatistics);
GoogleCloudStorageEventSubscriber.getInstance(storageStatistics);

@Before
public void setUp() throws Exception {
Expand All @@ -59,6 +59,7 @@ public void setUp() throws Exception {
@After
public void cleanup() throws Exception {
GoogleCloudStorageEventBus.unregister(subscriber);
GoogleCloudStorageEventSubscriber.reset();
}

private void verifyStatistics(GhfsGlobalStorageStatistics expectedStats) {
Expand All @@ -78,6 +79,21 @@ private void verifyStatistics(GhfsGlobalStorageStatistics expectedStats) {
assertThat(metricsVerified).isTrue();
}

@Test
public void test_multiple_register_of_statistics() throws Exception {
GoogleCloudStorageEventBus.register(subscriber);
GoogleCloudStorageEventBus.register(subscriber);
GoogleCloudStorageEventBus.register(
GoogleCloudStorageEventSubscriber.getInstance(storageStatistics));
GoogleCloudStorageEventBus.register(
GoogleCloudStorageEventSubscriber.getInstance(storageStatistics));

GoogleCloudStorageEventBus.onGcsRequest(new GcsRequestExecutionEvent());
GhfsGlobalStorageStatistics verifyCounterStats = new GhfsGlobalStorageStatistics();
verifyCounterStats.incrementCounter(GCS_API_REQUEST_COUNT, 1);
verifyStatistics(verifyCounterStats);
}

@Test
public void gcs_requestCounter() throws Exception {
GoogleCloudStorageEventBus.onGcsRequest(new GcsRequestExecutionEvent());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import com.google.cloud.hadoop.gcsio.testing.InMemoryGoogleCloudStorage;
import com.google.cloud.hadoop.util.AccessTokenProvider;
import com.google.cloud.hadoop.util.ApiErrorExtractor;
import com.google.cloud.hadoop.util.GoogleCloudStorageEventBus;
import com.google.cloud.hadoop.util.HadoopCredentialsConfiguration.AuthenticationType;
import com.google.cloud.hadoop.util.testing.TestingAccessTokenProvider;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -2301,6 +2302,32 @@ public void testThreadTraceEnabledRename() throws Exception {
assertThat(ghfs.exists(dest)).isTrue();
}

@Test
public void register_subscriber_multiple_time() throws Exception {
GoogleHadoopFileSystem myGhfs =
createInMemoryGoogleHadoopFileSystem(); // registers the subscriber class first time in
// myGhfs1
StorageStatistics stats = TestUtils.getStorageStatistics();

GoogleCloudStorageEventBus.register(
GoogleCloudStorageEventSubscriber.getInstance(
(GhfsGlobalStorageStatistics)
stats)); // registers the same subscriber class second time

assertThat(getMetricValue(stats, INVOCATION_CREATE)).isEqualTo(0);
assertThat(getMetricValue(stats, FILES_CREATED)).isEqualTo(0);

try (FSDataOutputStream fout = myGhfs.create(new Path("/file1"))) {
fout.writeBytes("Test Content");
}
assertThat(getMetricValue(stats, INVOCATION_CREATE)).isEqualTo(1);
assertThat(getMetricValue(stats, FILES_CREATED)).isEqualTo(1);
assertThat(myGhfs.delete(new Path("/file1"))).isTrue();

TestUtils.verifyDurationMetric(
(GhfsGlobalStorageStatistics) stats, INVOCATION_CREATE.getSymbol(), 1);
}

private static Long getMetricValue(StorageStatistics stats, GhfsStatistic invocationCreate) {
return stats.getLong(invocationCreate.getSymbol());
}
Expand Down

0 comments on commit c030979

Please sign in to comment.