Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
guljain committed Jul 30, 2024
1 parent cfc0730 commit fb964ce
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 257 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,19 @@

/* Stores the subscriber methods corresponding to GoogleCloudStorageEventBus */
public class GoogleCloudStorageEventSubscriber {
private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();
private static GhfsGlobalStorageStatistics storageStatistics;
private static GoogleCloudStorageEventSubscriber INSTANCE = null;

public GoogleCloudStorageEventSubscriber(GhfsGlobalStorageStatistics storageStatistics) {
private GoogleCloudStorageEventSubscriber(GhfsGlobalStorageStatistics storageStatistics) {
this.storageStatistics = storageStatistics;
}

/*
* Singleton class such that registration of subscriber methods is only once.
* */
public static synchronized GoogleCloudStorageEventSubscriber getInstance(
@Nonnull GhfsStorageStatistics storageStatistics) {
@Nonnull GhfsGlobalStorageStatistics storageStatistics) {
if (INSTANCE == null) {
logger.atFiner().log("Subscriber class invoked for first time");
INSTANCE = new GoogleCloudStorageEventSubscriber(storageStatistics);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ public GoogleHadoopFileSystem() {
}

GoogleCloudStorageEventBus.register(
new GoogleCloudStorageEventSubscriber(globalStorageStatistics));
GoogleCloudStorageEventSubscriber.getInstance(globalStorageStatistics));
}

/**
Expand Down
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public void test_multiple_register_of_statistics() throws Exception {
GoogleCloudStorageEventSubscriber.getInstance(storageStatistics));

GoogleCloudStorageEventBus.onGcsRequest(new GcsRequestExecutionEvent());
GhfsStorageStatistics verifyCounterStats = new GhfsStorageStatistics();
GhfsGlobalStorageStatistics verifyCounterStats = new GhfsGlobalStorageStatistics();
verifyCounterStats.incrementCounter(GCS_API_REQUEST_COUNT, 1);
verifyStatistics(verifyCounterStats);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,11 @@
import com.google.cloud.hadoop.gcsio.MethodOutcome;
import com.google.cloud.hadoop.gcsio.StorageResourceId;
import com.google.cloud.hadoop.gcsio.testing.InMemoryGoogleCloudStorage;
import com.google.cloud.hadoop.util.AccessTokenProvider;
import com.google.cloud.hadoop.util.ApiErrorExtractor;
import com.google.cloud.hadoop.util.GoogleCloudStorageEventBus;
import com.google.cloud.hadoop.util.HadoopCredentialsConfiguration.AuthenticationType;
import com.google.cloud.hadoop.util.testing.TestingAccessTokenProvider;
import com.google.common.collect.ImmutableList;
import com.google.common.hash.Hashing;
import com.google.common.primitives.Ints;
Expand Down Expand Up @@ -2283,259 +2287,6 @@ public void testInitializeCompatibleWithHadoopCredentialProvider() throws Except
// Initialization successful with no exception thrown.
}

@Test
public void register_subscriber_multiple_time() throws Exception {
GoogleHadoopFileSystem myGhfs =
createInMemoryGoogleHadoopFileSystem(); // registers the subscriber class first time in
// myGhfs1
StorageStatistics stats = TestUtils.getStorageStatistics();

GoogleCloudStorageEventBus.register(
GoogleCloudStorageEventSubscriber.getInstance(
(GhfsStorageStatistics) stats)); // registers the same subscriber class second time

assertThat(getMetricValue(stats, INVOCATION_CREATE)).isEqualTo(0);
assertThat(getMetricValue(stats, FILES_CREATED)).isEqualTo(0);

try (FSDataOutputStream fout = myGhfs.create(new Path("/file1"))) {
fout.writeBytes("Test Content");
}
assertThat(getMetricValue(stats, INVOCATION_CREATE)).isEqualTo(1);
assertThat(getMetricValue(stats, FILES_CREATED)).isEqualTo(1);
assertThat(myGhfs.delete(new Path("/file1"))).isTrue();

TestUtils.verifyDurationMetric((GhfsStorageStatistics) stats, INVOCATION_CREATE, 1);
}

@Test
public void create_IOstatistics() throws IOException {
GoogleHadoopFileSystem myGhfs = createInMemoryGoogleHadoopFileSystem();
StorageStatistics stats = TestUtils.getStorageStatistics();

assertThat(getMetricValue(stats, INVOCATION_CREATE)).isEqualTo(0);
assertThat(getMetricValue(stats, FILES_CREATED)).isEqualTo(0);

try (FSDataOutputStream fout = myGhfs.create(new Path("/file1"))) {
fout.writeBytes("Test Content");
}
assertThat(getMetricValue(stats, INVOCATION_CREATE)).isEqualTo(1);
assertThat(getMetricValue(stats, FILES_CREATED)).isEqualTo(1);
assertThat(myGhfs.delete(new Path("/file1"))).isTrue();

TestUtils.verifyDurationMetric((GhfsStorageStatistics) stats, INVOCATION_CREATE, 1);
}

@Test
public void open_IOstatistics() throws IOException {
GoogleHadoopFileSystem myGhfs = createInMemoryGoogleHadoopFileSystem();
StorageStatistics stats = TestUtils.getStorageStatistics();

Path testRoot = new Path("/directory1/");
myGhfs.mkdirs(testRoot);
FSDataOutputStream fout = myGhfs.create(new Path("/directory1/file1"));
fout.writeBytes("data");
fout.close();
myGhfs.open(new Path("/directory1/file1"));
assertThat(getMetricValue(stats, INVOCATION_OPEN)).isEqualTo(1);
assertThat(myGhfs.delete(testRoot, /* recursive= */ true)).isTrue();

TestUtils.verifyDurationMetric((GhfsStorageStatistics) stats, INVOCATION_OPEN, 1);
}

@Test
public void delete_IOstatistics() throws IOException {
GoogleHadoopFileSystem myGhfs = createInMemoryGoogleHadoopFileSystem();
StorageStatistics stats = TestUtils.getStorageStatistics();

FSDataOutputStream fout = myGhfs.create(new Path("/file1"));
fout.writeBytes("data");
fout.close();
myGhfs.delete(new Path("/file1"));
assertThat(getMetricValue(stats, INVOCATION_DELETE)).isEqualTo(1);

TestUtils.verifyDurationMetric((GhfsStorageStatistics) stats, INVOCATION_DELETE, 1);
}

@Test
public void mkdirs_IOstatistics() throws IOException {
StorageStatistics stats = TestUtils.getStorageStatistics();
GoogleHadoopFileSystem myGhfs = createInMemoryGoogleHadoopFileSystem();

TestUtils.verifyDurationMetric((GhfsStorageStatistics) stats, INVOCATION_MKDIRS, 1);
}

@Test
public void delete_storage_statistics() throws IOException {
GoogleHadoopFileSystem myGhfs = createInMemoryGoogleHadoopFileSystem();
StorageStatistics stats = TestUtils.getStorageStatistics();
Path filePath = new Path("/file1");
FSDataOutputStream fout = myGhfs.create(filePath);
fout.writeBytes("Test Content");
fout.close();
assertThat(myGhfs.delete(filePath)).isTrue();

TestUtils.verifyDurationMetric((GhfsStorageStatistics) stats, INVOCATION_DELETE, 1);
}

@Test
public void GlobStatus_storage_statistics() throws IOException {
GoogleHadoopFileSystem myGhfs = createInMemoryGoogleHadoopFileSystem();
StorageStatistics stats = TestUtils.getStorageStatistics();
Path testRoot = new Path("/directory1/");
myGhfs.mkdirs(testRoot);
myGhfs.mkdirs(new Path("/directory1/subdirectory1"));
myGhfs.create(new Path("/directory1/subdirectory1/file1")).writeBytes("data");
myGhfs.globStatus(new Path("/d*"));

TestUtils.verifyDurationMetric((GhfsStorageStatistics) stats, INVOCATION_GLOB_STATUS, 1);

assertThat(myGhfs.delete(testRoot, /* recursive= */ true)).isTrue();
}

@Test
public void getFileStatus_storage_statistics() throws IOException {
GoogleHadoopFileSystem myGhfs = createInMemoryGoogleHadoopFileSystem();
StorageStatistics stats = TestUtils.getStorageStatistics();
Path testRoot = new Path("/directory1/");
myGhfs.mkdirs(testRoot);
FSDataOutputStream fout = myGhfs.create(new Path("/directory1/file1"));
fout.writeBytes("data");
fout.close();
myGhfs.getFileStatus(new Path("/directory1/file1"));

TestUtils.verifyDurationMetric((GhfsStorageStatistics) stats, INVOCATION_GET_FILE_STATUS, 1);

assertThat(myGhfs.delete(testRoot, /* recursive= */ true)).isTrue();
}

@Test
public void createNonRecursive_storage_statistics() throws IOException {
GoogleHadoopFileSystem myGhfs = createInMemoryGoogleHadoopFileSystem();
StorageStatistics stats = TestUtils.getStorageStatistics();
Path testRoot = new Path("/directory1/");
myGhfs.mkdirs(testRoot);
Path filePath = new Path("/directory1/file1");
try (FSDataOutputStream createNonRecursiveOutputStream =
myGhfs.createNonRecursive(filePath, true, 1, (short) 1, 1, () -> {})) {
createNonRecursiveOutputStream.write(1);
}
TestUtils.verifyDurationMetric(
(GhfsStorageStatistics) stats, INVOCATION_CREATE_NON_RECURSIVE, 1);

assertThat(myGhfs.delete(testRoot, /* recursive= */ true)).isTrue();
}

@Test
public void getFileChecksum_storage_statistics() throws IOException {
GoogleHadoopFileSystem myGhfs = createInMemoryGoogleHadoopFileSystem();
StorageStatistics stats = TestUtils.getStorageStatistics();
Path testRoot = new Path("/directory1/");
myGhfs.mkdirs(testRoot);
Path filePath = new Path("/directory1/file1");
FSDataOutputStream fout = myGhfs.create(new Path("/directory1/file1"));
fout.writeBytes("data");
fout.close();
myGhfs.getFileChecksum(filePath);

assertThat(stats.getLong(GhfsStatistic.INVOCATION_GET_FILE_CHECKSUM.getSymbol())).isEqualTo(1);
assertThat(myGhfs.delete(testRoot, /* recursive= */ true)).isTrue();
}

@Test
public void rename_storage_statistics() throws IOException {
GoogleHadoopFileSystem myGhfs = createInMemoryGoogleHadoopFileSystem();
StorageStatistics stats = TestUtils.getStorageStatistics();
Path testRoot = new Path("/directory1/");
myGhfs.mkdirs(testRoot);
Path source = new Path("/directory1/file1");
myGhfs.create(source).writeBytes("data");
Path dest = new Path("/directory1/file2");
myGhfs.rename(source, dest);

TestUtils.verifyDurationMetric((GhfsStorageStatistics) stats, INVOCATION_RENAME, 1);

assertThat(myGhfs.delete(testRoot, /* recursive= */ true)).isTrue();
}

@Test
public void xattr_storage_statistics() throws IOException {
GoogleHadoopFileSystem myGhfs = createInMemoryGoogleHadoopFileSystem();
Path testRoot = new Path("/directory1/");
myGhfs.mkdirs(testRoot);
Path filePath = new Path("/directory1/file1");
StorageStatistics stats = TestUtils.getStorageStatistics();
myGhfs.create(filePath).writeBytes("data");

myGhfs.getXAttrs(filePath);

TestUtils.verifyDurationMetric(
(GhfsStorageStatistics) stats, GhfsStatistic.INVOCATION_XATTR_GET_MAP, 1);

myGhfs.getXAttr(filePath, "test-xattr_statistics");

TestUtils.verifyDurationMetric(
(GhfsStorageStatistics) stats, GhfsStatistic.INVOCATION_XATTR_GET_NAMED, 1);

myGhfs.getXAttrs(
filePath,
ImmutableList.of("test-xattr-statistics", "test-xattr-statistics1", "test-xattr"));

TestUtils.verifyDurationMetric(
(GhfsStorageStatistics) stats, GhfsStatistic.INVOCATION_XATTR_GET_NAMED_MAP, 1);

myGhfs.listXAttrs(filePath);

TestUtils.verifyDurationMetric(
(GhfsStorageStatistics) stats, GhfsStatistic.INVOCATION_OP_XATTR_LIST, 1);

assertThat(myGhfs.delete(testRoot, true)).isTrue();
}

@Test
public void verify_storage_statistics_metrics() {
StorageStatistics statistics = new GhfsStorageStatistics();
ArrayList<LongStatistic> metrics = Lists.newArrayList(statistics.getLongStatistics());
HashSet<String> metricNames = new HashSet<>();
for (LongStatistic longStatistic : metrics) {
metricNames.add(longStatistic.getName());
}

String statsString = statistics.toString();
int expected = 0;
for (GhfsStatistic ghfsStatistic : GhfsStatistic.values()) {
expected++;

String metricName = ghfsStatistic.getSymbol();

checkMetric(metricName, statistics, metricNames, statsString);
if (ghfsStatistic.getType() == StatisticTypeEnum.TYPE_DURATION
|| ghfsStatistic.getType() == StatisticTypeEnum.TYPE_DURATION_TOTAL) {
expected += 3;

for (String suffix : ImmutableList.of("_min", "_max", "_mean")) {
checkMetric(metricName + suffix, statistics, metricNames, statsString);
}
if (ghfsStatistic.getType() == StatisticTypeEnum.TYPE_DURATION_TOTAL) {
expected++;
checkMetric(metricName + "_duration", statistics, metricNames, statsString);
}
}
}

for (GoogleCloudStorageStatistics googleCloudStorageStatusStatistic :
GoogleCloudStorageStatistics.values()) {
expected++;

String metricName = googleCloudStorageStatusStatistic.getSymbol();

checkMetric(metricName, statistics, metricNames, statsString);
}

assertEquals(expected, metricNames.size());
assertThat(statistics.isTracked("invalid")).isFalse();
assertEquals(0, statistics.getLong("invalid").longValue());
}

@Test
public void testThreadTraceEnabledRename() throws Exception {
Configuration config = ghfs.getConf();
Expand Down Expand Up @@ -2673,6 +2424,32 @@ public void testHnBucketDeleteOperationOnNonExistingFolder() throws Exception {
}
}

@Test
public void register_subscriber_multiple_time() throws Exception {
GoogleHadoopFileSystem myGhfs =
createInMemoryGoogleHadoopFileSystem(); // registers the subscriber class first time in
// myGhfs1
StorageStatistics stats = TestUtils.getStorageStatistics();

GoogleCloudStorageEventBus.register(
GoogleCloudStorageEventSubscriber.getInstance(
(GhfsGlobalStorageStatistics)
stats)); // registers the same subscriber class second time

assertThat(getMetricValue(stats, INVOCATION_CREATE)).isEqualTo(0);
assertThat(getMetricValue(stats, FILES_CREATED)).isEqualTo(0);

try (FSDataOutputStream fout = myGhfs.create(new Path("/file1"))) {
fout.writeBytes("Test Content");
}
assertThat(getMetricValue(stats, INVOCATION_CREATE)).isEqualTo(1);
assertThat(getMetricValue(stats, FILES_CREATED)).isEqualTo(1);
assertThat(myGhfs.delete(new Path("/file1"))).isTrue();

TestUtils.verifyDurationMetric(
(GhfsGlobalStorageStatistics) stats, INVOCATION_CREATE.getSymbol(), 1);
}

private void createFile(GoogleHadoopFileSystem googleHadoopFileSystem, Path path)
throws Exception {
try (FSDataOutputStream fout = googleHadoopFileSystem.create(path)) {
Expand Down

0 comments on commit fb964ce

Please sign in to comment.