Skip to content

Commit

Permalink
2.2.x Fixing bug : Avoid registering subscriber class multiple times (G…
Browse files Browse the repository at this point in the history
…oogleCloudDataproc#1227)

* Fixing bug to avoid registering subscriber class multiple times

* Refactoring for Singleton pattern
  • Loading branch information
guljain committed Jul 30, 2024
1 parent 1ba5bc2 commit cfc0730
Show file tree
Hide file tree
Showing 4 changed files with 290 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.client.http.HttpResponseException;
import com.google.cloud.hadoop.util.GcsRequestExecutionEvent;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.eventbus.Subscribe;
import com.google.common.flogger.GoogleLogger;
import io.grpc.Status;
import java.io.IOException;
import javax.annotation.Nonnull;
Expand All @@ -32,6 +34,23 @@ public GoogleCloudStorageEventSubscriber(GhfsGlobalStorageStatistics storageStat
this.storageStatistics = storageStatistics;
}

/*
* Singleton class such that registration of subscriber methods is only once.
* */
public static synchronized GoogleCloudStorageEventSubscriber getInstance(
@Nonnull GhfsStorageStatistics storageStatistics) {
if (INSTANCE == null) {
logger.atFiner().log("Subscriber class invoked for first time");
INSTANCE = new GoogleCloudStorageEventSubscriber(storageStatistics);
}
return INSTANCE;
}

@VisibleForTesting
protected static void reset() {
INSTANCE = null;
}

/**
* Updating the required gcs specific statistics based on HttpResponseException.
*
Expand Down
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class GoogleCloudStorageStatisticsTest {
private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();
private GhfsGlobalStorageStatistics storageStatistics = new GhfsGlobalStorageStatistics();
protected GoogleCloudStorageEventSubscriber subscriber =
new GoogleCloudStorageEventSubscriber(storageStatistics);
GoogleCloudStorageEventSubscriber.getInstance(storageStatistics);

@Before
public void setUp() throws Exception {
Expand All @@ -59,6 +59,7 @@ public void setUp() throws Exception {
@After
public void cleanup() throws Exception {
GoogleCloudStorageEventBus.unregister(subscriber);
GoogleCloudStorageEventSubscriber.reset();
}

private void verifyStatistics(GhfsGlobalStorageStatistics expectedStats) {
Expand All @@ -78,6 +79,21 @@ private void verifyStatistics(GhfsGlobalStorageStatistics expectedStats) {
assertThat(metricsVerified).isTrue();
}

@Test
public void test_multiple_register_of_statistics() throws Exception {
GoogleCloudStorageEventBus.register(subscriber);
GoogleCloudStorageEventBus.register(subscriber);
GoogleCloudStorageEventBus.register(
GoogleCloudStorageEventSubscriber.getInstance(storageStatistics));
GoogleCloudStorageEventBus.register(
GoogleCloudStorageEventSubscriber.getInstance(storageStatistics));

GoogleCloudStorageEventBus.onGcsRequest(new GcsRequestExecutionEvent());
GhfsStorageStatistics verifyCounterStats = new GhfsStorageStatistics();
verifyCounterStats.incrementCounter(GCS_API_REQUEST_COUNT, 1);
verifyStatistics(verifyCounterStats);
}

@Test
public void gcs_requestCounter() throws Exception {
GoogleCloudStorageEventBus.onGcsRequest(new GcsRequestExecutionEvent());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,7 @@
import com.google.cloud.hadoop.gcsio.MethodOutcome;
import com.google.cloud.hadoop.gcsio.StorageResourceId;
import com.google.cloud.hadoop.gcsio.testing.InMemoryGoogleCloudStorage;
import com.google.cloud.hadoop.util.AccessTokenProvider;
import com.google.cloud.hadoop.util.ApiErrorExtractor;
import com.google.cloud.hadoop.util.HadoopCredentialsConfiguration.AuthenticationType;
import com.google.cloud.hadoop.util.testing.TestingAccessTokenProvider;
import com.google.cloud.hadoop.util.GoogleCloudStorageEventBus;
import com.google.common.collect.ImmutableList;
import com.google.common.hash.Hashing;
import com.google.common.primitives.Ints;
Expand Down Expand Up @@ -2286,6 +2283,259 @@ public void testInitializeCompatibleWithHadoopCredentialProvider() throws Except
// Initialization successful with no exception thrown.
}

@Test
public void register_subscriber_multiple_time() throws Exception {
GoogleHadoopFileSystem myGhfs =
createInMemoryGoogleHadoopFileSystem(); // registers the subscriber class first time in
// myGhfs1
StorageStatistics stats = TestUtils.getStorageStatistics();

GoogleCloudStorageEventBus.register(
GoogleCloudStorageEventSubscriber.getInstance(
(GhfsStorageStatistics) stats)); // registers the same subscriber class second time

assertThat(getMetricValue(stats, INVOCATION_CREATE)).isEqualTo(0);
assertThat(getMetricValue(stats, FILES_CREATED)).isEqualTo(0);

try (FSDataOutputStream fout = myGhfs.create(new Path("/file1"))) {
fout.writeBytes("Test Content");
}
assertThat(getMetricValue(stats, INVOCATION_CREATE)).isEqualTo(1);
assertThat(getMetricValue(stats, FILES_CREATED)).isEqualTo(1);
assertThat(myGhfs.delete(new Path("/file1"))).isTrue();

TestUtils.verifyDurationMetric((GhfsStorageStatistics) stats, INVOCATION_CREATE, 1);
}

@Test
public void create_IOstatistics() throws IOException {
GoogleHadoopFileSystem myGhfs = createInMemoryGoogleHadoopFileSystem();
StorageStatistics stats = TestUtils.getStorageStatistics();

assertThat(getMetricValue(stats, INVOCATION_CREATE)).isEqualTo(0);
assertThat(getMetricValue(stats, FILES_CREATED)).isEqualTo(0);

try (FSDataOutputStream fout = myGhfs.create(new Path("/file1"))) {
fout.writeBytes("Test Content");
}
assertThat(getMetricValue(stats, INVOCATION_CREATE)).isEqualTo(1);
assertThat(getMetricValue(stats, FILES_CREATED)).isEqualTo(1);
assertThat(myGhfs.delete(new Path("/file1"))).isTrue();

TestUtils.verifyDurationMetric((GhfsStorageStatistics) stats, INVOCATION_CREATE, 1);
}

@Test
public void open_IOstatistics() throws IOException {
GoogleHadoopFileSystem myGhfs = createInMemoryGoogleHadoopFileSystem();
StorageStatistics stats = TestUtils.getStorageStatistics();

Path testRoot = new Path("/directory1/");
myGhfs.mkdirs(testRoot);
FSDataOutputStream fout = myGhfs.create(new Path("/directory1/file1"));
fout.writeBytes("data");
fout.close();
myGhfs.open(new Path("/directory1/file1"));
assertThat(getMetricValue(stats, INVOCATION_OPEN)).isEqualTo(1);
assertThat(myGhfs.delete(testRoot, /* recursive= */ true)).isTrue();

TestUtils.verifyDurationMetric((GhfsStorageStatistics) stats, INVOCATION_OPEN, 1);
}

@Test
public void delete_IOstatistics() throws IOException {
GoogleHadoopFileSystem myGhfs = createInMemoryGoogleHadoopFileSystem();
StorageStatistics stats = TestUtils.getStorageStatistics();

FSDataOutputStream fout = myGhfs.create(new Path("/file1"));
fout.writeBytes("data");
fout.close();
myGhfs.delete(new Path("/file1"));
assertThat(getMetricValue(stats, INVOCATION_DELETE)).isEqualTo(1);

TestUtils.verifyDurationMetric((GhfsStorageStatistics) stats, INVOCATION_DELETE, 1);
}

@Test
public void mkdirs_IOstatistics() throws IOException {
StorageStatistics stats = TestUtils.getStorageStatistics();
GoogleHadoopFileSystem myGhfs = createInMemoryGoogleHadoopFileSystem();

TestUtils.verifyDurationMetric((GhfsStorageStatistics) stats, INVOCATION_MKDIRS, 1);
}

@Test
public void delete_storage_statistics() throws IOException {
GoogleHadoopFileSystem myGhfs = createInMemoryGoogleHadoopFileSystem();
StorageStatistics stats = TestUtils.getStorageStatistics();
Path filePath = new Path("/file1");
FSDataOutputStream fout = myGhfs.create(filePath);
fout.writeBytes("Test Content");
fout.close();
assertThat(myGhfs.delete(filePath)).isTrue();

TestUtils.verifyDurationMetric((GhfsStorageStatistics) stats, INVOCATION_DELETE, 1);
}

@Test
public void GlobStatus_storage_statistics() throws IOException {
GoogleHadoopFileSystem myGhfs = createInMemoryGoogleHadoopFileSystem();
StorageStatistics stats = TestUtils.getStorageStatistics();
Path testRoot = new Path("/directory1/");
myGhfs.mkdirs(testRoot);
myGhfs.mkdirs(new Path("/directory1/subdirectory1"));
myGhfs.create(new Path("/directory1/subdirectory1/file1")).writeBytes("data");
myGhfs.globStatus(new Path("/d*"));

TestUtils.verifyDurationMetric((GhfsStorageStatistics) stats, INVOCATION_GLOB_STATUS, 1);

assertThat(myGhfs.delete(testRoot, /* recursive= */ true)).isTrue();
}

@Test
public void getFileStatus_storage_statistics() throws IOException {
GoogleHadoopFileSystem myGhfs = createInMemoryGoogleHadoopFileSystem();
StorageStatistics stats = TestUtils.getStorageStatistics();
Path testRoot = new Path("/directory1/");
myGhfs.mkdirs(testRoot);
FSDataOutputStream fout = myGhfs.create(new Path("/directory1/file1"));
fout.writeBytes("data");
fout.close();
myGhfs.getFileStatus(new Path("/directory1/file1"));

TestUtils.verifyDurationMetric((GhfsStorageStatistics) stats, INVOCATION_GET_FILE_STATUS, 1);

assertThat(myGhfs.delete(testRoot, /* recursive= */ true)).isTrue();
}

@Test
public void createNonRecursive_storage_statistics() throws IOException {
GoogleHadoopFileSystem myGhfs = createInMemoryGoogleHadoopFileSystem();
StorageStatistics stats = TestUtils.getStorageStatistics();
Path testRoot = new Path("/directory1/");
myGhfs.mkdirs(testRoot);
Path filePath = new Path("/directory1/file1");
try (FSDataOutputStream createNonRecursiveOutputStream =
myGhfs.createNonRecursive(filePath, true, 1, (short) 1, 1, () -> {})) {
createNonRecursiveOutputStream.write(1);
}
TestUtils.verifyDurationMetric(
(GhfsStorageStatistics) stats, INVOCATION_CREATE_NON_RECURSIVE, 1);

assertThat(myGhfs.delete(testRoot, /* recursive= */ true)).isTrue();
}

@Test
public void getFileChecksum_storage_statistics() throws IOException {
GoogleHadoopFileSystem myGhfs = createInMemoryGoogleHadoopFileSystem();
StorageStatistics stats = TestUtils.getStorageStatistics();
Path testRoot = new Path("/directory1/");
myGhfs.mkdirs(testRoot);
Path filePath = new Path("/directory1/file1");
FSDataOutputStream fout = myGhfs.create(new Path("/directory1/file1"));
fout.writeBytes("data");
fout.close();
myGhfs.getFileChecksum(filePath);

assertThat(stats.getLong(GhfsStatistic.INVOCATION_GET_FILE_CHECKSUM.getSymbol())).isEqualTo(1);
assertThat(myGhfs.delete(testRoot, /* recursive= */ true)).isTrue();
}

@Test
public void rename_storage_statistics() throws IOException {
GoogleHadoopFileSystem myGhfs = createInMemoryGoogleHadoopFileSystem();
StorageStatistics stats = TestUtils.getStorageStatistics();
Path testRoot = new Path("/directory1/");
myGhfs.mkdirs(testRoot);
Path source = new Path("/directory1/file1");
myGhfs.create(source).writeBytes("data");
Path dest = new Path("/directory1/file2");
myGhfs.rename(source, dest);

TestUtils.verifyDurationMetric((GhfsStorageStatistics) stats, INVOCATION_RENAME, 1);

assertThat(myGhfs.delete(testRoot, /* recursive= */ true)).isTrue();
}

@Test
public void xattr_storage_statistics() throws IOException {
GoogleHadoopFileSystem myGhfs = createInMemoryGoogleHadoopFileSystem();
Path testRoot = new Path("/directory1/");
myGhfs.mkdirs(testRoot);
Path filePath = new Path("/directory1/file1");
StorageStatistics stats = TestUtils.getStorageStatistics();
myGhfs.create(filePath).writeBytes("data");

myGhfs.getXAttrs(filePath);

TestUtils.verifyDurationMetric(
(GhfsStorageStatistics) stats, GhfsStatistic.INVOCATION_XATTR_GET_MAP, 1);

myGhfs.getXAttr(filePath, "test-xattr_statistics");

TestUtils.verifyDurationMetric(
(GhfsStorageStatistics) stats, GhfsStatistic.INVOCATION_XATTR_GET_NAMED, 1);

myGhfs.getXAttrs(
filePath,
ImmutableList.of("test-xattr-statistics", "test-xattr-statistics1", "test-xattr"));

TestUtils.verifyDurationMetric(
(GhfsStorageStatistics) stats, GhfsStatistic.INVOCATION_XATTR_GET_NAMED_MAP, 1);

myGhfs.listXAttrs(filePath);

TestUtils.verifyDurationMetric(
(GhfsStorageStatistics) stats, GhfsStatistic.INVOCATION_OP_XATTR_LIST, 1);

assertThat(myGhfs.delete(testRoot, true)).isTrue();
}

@Test
public void verify_storage_statistics_metrics() {
StorageStatistics statistics = new GhfsStorageStatistics();
ArrayList<LongStatistic> metrics = Lists.newArrayList(statistics.getLongStatistics());
HashSet<String> metricNames = new HashSet<>();
for (LongStatistic longStatistic : metrics) {
metricNames.add(longStatistic.getName());
}

String statsString = statistics.toString();
int expected = 0;
for (GhfsStatistic ghfsStatistic : GhfsStatistic.values()) {
expected++;

String metricName = ghfsStatistic.getSymbol();

checkMetric(metricName, statistics, metricNames, statsString);
if (ghfsStatistic.getType() == StatisticTypeEnum.TYPE_DURATION
|| ghfsStatistic.getType() == StatisticTypeEnum.TYPE_DURATION_TOTAL) {
expected += 3;

for (String suffix : ImmutableList.of("_min", "_max", "_mean")) {
checkMetric(metricName + suffix, statistics, metricNames, statsString);
}
if (ghfsStatistic.getType() == StatisticTypeEnum.TYPE_DURATION_TOTAL) {
expected++;
checkMetric(metricName + "_duration", statistics, metricNames, statsString);
}
}
}

for (GoogleCloudStorageStatistics googleCloudStorageStatusStatistic :
GoogleCloudStorageStatistics.values()) {
expected++;

String metricName = googleCloudStorageStatusStatistic.getSymbol();

checkMetric(metricName, statistics, metricNames, statsString);
}

assertEquals(expected, metricNames.size());
assertThat(statistics.isTracked("invalid")).isFalse();
assertEquals(0, statistics.getLong("invalid").longValue());
}

@Test
public void testThreadTraceEnabledRename() throws Exception {
Configuration config = ghfs.getConf();
Expand Down

0 comments on commit cfc0730

Please sign in to comment.