From ea0c048865d31931ef42ba4e9048686a9143015d Mon Sep 17 00:00:00 2001 From: Arunkumar Chacko Date: Mon, 9 Sep 2024 13:37:58 +0530 Subject: [PATCH] Add open() API with FileStatus (#1244) --- gcs/CHANGES.md | 1 + .../hadoop/fs/gcs/GoogleHadoopFileSystem.java | 27 ++++++++++ .../fs/gcs/GoogleHadoopFileSystemTest.java | 53 +++++++++++++++++++ .../fs/gcs/HadoopFileSystemTestBase.java | 33 ++++++++++++ 4 files changed, 114 insertions(+) diff --git a/gcs/CHANGES.md b/gcs/CHANGES.md index 917de555d4..0580f8ef94 100644 --- a/gcs/CHANGES.md +++ b/gcs/CHANGES.md @@ -1,6 +1,7 @@ # Release Notes ## Next + 1. Add gRPC configuration documentation 1. Remove Hadoop 2.x support. diff --git a/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystem.java b/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystem.java index 3b25b1c96d..fd5d7504cf 100644 --- a/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystem.java +++ b/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystem.java @@ -583,6 +583,33 @@ public FSDataInputStream open(Path hadoopPath, int bufferSize) throws IOExceptio }); } + public FSDataInputStream open(FileStatus status) throws IOException { + logger.atFine().log("openWithStatus(%s)", status); + + if (!GoogleHadoopFileStatus.class.isAssignableFrom(status.getClass())) { + throw new IllegalArgumentException( + String.format( + "Expected status to be of type GoogleHadoopFileStatus, but found %s", + status.getClass())); + } + + GoogleHadoopFileStatus fileStatus = (GoogleHadoopFileStatus) status; + + checkPath(status.getPath()); + + return trackDurationWithTracing( + instrumentation, + globalStorageStatistics, + GhfsStatistic.INVOCATION_OPEN, + status.getPath(), + this.traceFactory, + () -> { + checkOpen(); + return new FSDataInputStream( + GoogleHadoopFSInputStream.create(this, fileStatus.getFileInfo(), statistics)); + }); + } + @Override public FSDataOutputStream create( Path hadoopPath, diff --git a/gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemTest.java b/gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemTest.java index 356f15ef75..c4b1671848 100644 --- a/gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemTest.java +++ b/gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemTest.java @@ -22,6 +22,7 @@ import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertThrows; +import com.google.cloud.hadoop.gcsio.FileInfo; import com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem; import com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystemImpl; import com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystemOptions; @@ -29,10 +30,12 @@ import com.google.cloud.hadoop.gcsio.GoogleCloudStorageOptions; import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadOptions; import com.google.cloud.hadoop.gcsio.MethodOutcome; +import com.google.cloud.hadoop.gcsio.StorageResourceId; import com.google.cloud.hadoop.gcsio.testing.InMemoryGoogleCloudStorage; import com.google.cloud.hadoop.util.AccessTokenProvider; import com.google.cloud.hadoop.util.HadoopCredentialsConfiguration.AuthenticationType; import com.google.cloud.hadoop.util.testing.TestingAccessTokenProvider; +import com.google.common.collect.ImmutableMap; import com.google.errorprone.annotations.CanIgnoreReturnValue; import java.io.FileNotFoundException; import java.io.IOException; @@ -47,6 +50,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -255,6 +259,55 @@ public void testGetDefaultPortIndicatesPortsAreNotUsed() throws Exception { assertThat(ghfs.getDefaultPort()).isEqualTo(-1); } + @Test + public void testFileOpenWithStatus() throws Exception { + URI bucketName = new URI("gs://read-test-bucket/"); + URI failureBucketName = new URI("gs://read-test-bucket-other/"); + + FileInfo fileInfo = + FileInfo.fromItemInfo( + GoogleCloudStorageItemInfo.createObject( + new StorageResourceId(bucketName.getAuthority(), "bar/test/object"), + /* creationTime= */ 10L, + /* modificationTime= */ 15L, + /* size= */ 200L, + "text/plain", + /* contentEncoding= */ "lzma", + /* metadata= */ ImmutableMap.of("foo-meta", new byte[] {5, 66, 56}), + /* contentGeneration= */ 312432L, + /* metaGeneration= */ 2L, + /* verificationAttributes= */ null)); + + GoogleHadoopFileStatus fileStatus = + new GoogleHadoopFileStatus( + fileInfo, new Path(fileInfo.getPath()), 1, 2, FsPermission.getFileDefault(), "foo"); + try (GoogleHadoopFileSystem fs = new GoogleHadoopFileSystem()) { + fs.initialize(bucketName, new Configuration()); + fs.open(fileStatus); + + fs.initialize(failureBucketName, new Configuration()); + + IllegalArgumentException exception = + assertThrows(IllegalArgumentException.class, () -> fs.open(fileStatus)); + assertThat(exception.getMessage()) + .isEqualTo( + "Wrong bucket: read-test-bucket, in path: gs://read-test-bucket/bar/test/object, expected bucket: read-test-bucket-other"); + } + } + + @Test + public void testFileOpenWithStatusInvalidType() throws Exception { + try (GoogleHadoopFileSystem fs = new GoogleHadoopFileSystem()) { + fs.initialize(new URI("gs://read-test-bucket/"), new Configuration()); + + IllegalArgumentException exception = + assertThrows(IllegalArgumentException.class, () -> fs.open(new FileStatus())); + assertThat(exception.getMessage()) + .isEqualTo( + "Expected status to be of type GoogleHadoopFileStatus, but found class org.apache.hadoop.fs.FileStatus"); + } + } + // ----------------------------------------------------------------- // Inherited tests that we suppress because their behavior differs // from the base class. diff --git a/gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/HadoopFileSystemTestBase.java b/gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/HadoopFileSystemTestBase.java index 072e5180e4..be15b80814 100644 --- a/gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/HadoopFileSystemTestBase.java +++ b/gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/HadoopFileSystemTestBase.java @@ -40,6 +40,7 @@ import java.io.EOFException; import java.io.FileNotFoundException; import java.io.IOException; +import java.lang.reflect.Method; import java.net.URI; import java.nio.ByteBuffer; import java.time.Instant; @@ -49,6 +50,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -1067,6 +1069,37 @@ public void testInvalidRangeRequest() throws Exception { } } + @Test + public void testGetOpenWithStatus() throws Exception { + String content = UUID.randomUUID().toString(); + Path hadoopPath = ghfsHelper.castAsHadoopPath(getTempFilePath()); + + ghfsHelper.writeFile(hadoopPath, content, 1, /* overwrite= */ false); + + FileStatus fileStatus = ghfs.getFileStatus(hadoopPath); + + Method openWithStatus = ghfs.getClass().getMethod("open", FileStatus.class); + try (FSDataInputStream readStream = + (FSDataInputStream) openWithStatus.invoke(ghfs, fileStatus)) { + String readContent = readContent(readStream); + + assertThat(readContent).isEqualTo(content); + } + } + + private static String readContent(FSDataInputStream readStream) throws IOException { + byte[] readBuffer = new byte[1024]; + StringBuilder returnBuffer = new StringBuilder(); + + int numBytesRead = readStream.read(readBuffer); + while (numBytesRead > 0) { + returnBuffer.append(new String(readBuffer, 0, numBytesRead, UTF_8)); + numBytesRead = readStream.read(readBuffer); + } + + return returnBuffer.toString(); + } + private void validateVectoredReadResult(List fileRanges, Path hadoopPath) throws Exception { for (FileRange range : fileRanges) {