Skip to content

Commit

Permalink
Add open() API with FileStatus (GoogleCloudDataproc#1244)
Browse files Browse the repository at this point in the history
  • Loading branch information
arunkumarchacko committed Sep 9, 2024
1 parent 7257fb4 commit ea0c048
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 0 deletions.
1 change: 1 addition & 0 deletions gcs/CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Release Notes

## Next

1. Add gRPC configuration documentation

1. Remove Hadoop 2.x support.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,33 @@ public FSDataInputStream open(Path hadoopPath, int bufferSize) throws IOExceptio
});
}

public FSDataInputStream open(FileStatus status) throws IOException {
logger.atFine().log("openWithStatus(%s)", status);

if (!GoogleHadoopFileStatus.class.isAssignableFrom(status.getClass())) {
throw new IllegalArgumentException(
String.format(
"Expected status to be of type GoogleHadoopFileStatus, but found %s",
status.getClass()));
}

GoogleHadoopFileStatus fileStatus = (GoogleHadoopFileStatus) status;

checkPath(status.getPath());

return trackDurationWithTracing(
instrumentation,
globalStorageStatistics,
GhfsStatistic.INVOCATION_OPEN,
status.getPath(),
this.traceFactory,
() -> {
checkOpen();
return new FSDataInputStream(
GoogleHadoopFSInputStream.create(this, fileStatus.getFileInfo(), statistics));
});
}

@Override
public FSDataOutputStream create(
Path hadoopPath,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,20 @@
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertThrows;

import com.google.cloud.hadoop.gcsio.FileInfo;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystemImpl;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystemOptions;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageItemInfo;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageOptions;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadOptions;
import com.google.cloud.hadoop.gcsio.MethodOutcome;
import com.google.cloud.hadoop.gcsio.StorageResourceId;
import com.google.cloud.hadoop.gcsio.testing.InMemoryGoogleCloudStorage;
import com.google.cloud.hadoop.util.AccessTokenProvider;
import com.google.cloud.hadoop.util.HadoopCredentialsConfiguration.AuthenticationType;
import com.google.cloud.hadoop.util.testing.TestingAccessTokenProvider;
import com.google.common.collect.ImmutableMap;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import java.io.FileNotFoundException;
import java.io.IOException;
Expand All @@ -47,6 +50,7 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -255,6 +259,55 @@ public void testGetDefaultPortIndicatesPortsAreNotUsed() throws Exception {
assertThat(ghfs.getDefaultPort()).isEqualTo(-1);
}

@Test
public void testFileOpenWithStatus() throws Exception {
URI bucketName = new URI("gs://read-test-bucket/");
URI failureBucketName = new URI("gs://read-test-bucket-other/");

FileInfo fileInfo =
FileInfo.fromItemInfo(
GoogleCloudStorageItemInfo.createObject(
new StorageResourceId(bucketName.getAuthority(), "bar/test/object"),
/* creationTime= */ 10L,
/* modificationTime= */ 15L,
/* size= */ 200L,
"text/plain",
/* contentEncoding= */ "lzma",
/* metadata= */ ImmutableMap.of("foo-meta", new byte[] {5, 66, 56}),
/* contentGeneration= */ 312432L,
/* metaGeneration= */ 2L,
/* verificationAttributes= */ null));

GoogleHadoopFileStatus fileStatus =
new GoogleHadoopFileStatus(
fileInfo, new Path(fileInfo.getPath()), 1, 2, FsPermission.getFileDefault(), "foo");
try (GoogleHadoopFileSystem fs = new GoogleHadoopFileSystem()) {
fs.initialize(bucketName, new Configuration());
fs.open(fileStatus);

fs.initialize(failureBucketName, new Configuration());

IllegalArgumentException exception =
assertThrows(IllegalArgumentException.class, () -> fs.open(fileStatus));
assertThat(exception.getMessage())
.isEqualTo(
"Wrong bucket: read-test-bucket, in path: gs://read-test-bucket/bar/test/object, expected bucket: read-test-bucket-other");
}
}

@Test
public void testFileOpenWithStatusInvalidType() throws Exception {
try (GoogleHadoopFileSystem fs = new GoogleHadoopFileSystem()) {
fs.initialize(new URI("gs://read-test-bucket/"), new Configuration());

IllegalArgumentException exception =
assertThrows(IllegalArgumentException.class, () -> fs.open(new FileStatus()));
assertThat(exception.getMessage())
.isEqualTo(
"Expected status to be of type GoogleHadoopFileStatus, but found class org.apache.hadoop.fs.FileStatus");
}
}

// -----------------------------------------------------------------
// Inherited tests that we suppress because their behavior differs
// from the base class.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.URI;
import java.nio.ByteBuffer;
import java.time.Instant;
Expand All @@ -49,6 +50,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -1067,6 +1069,37 @@ public void testInvalidRangeRequest() throws Exception {
}
}

@Test
public void testGetOpenWithStatus() throws Exception {
String content = UUID.randomUUID().toString();
Path hadoopPath = ghfsHelper.castAsHadoopPath(getTempFilePath());

ghfsHelper.writeFile(hadoopPath, content, 1, /* overwrite= */ false);

FileStatus fileStatus = ghfs.getFileStatus(hadoopPath);

Method openWithStatus = ghfs.getClass().getMethod("open", FileStatus.class);
try (FSDataInputStream readStream =
(FSDataInputStream) openWithStatus.invoke(ghfs, fileStatus)) {
String readContent = readContent(readStream);

assertThat(readContent).isEqualTo(content);
}
}

private static String readContent(FSDataInputStream readStream) throws IOException {
byte[] readBuffer = new byte[1024];
StringBuilder returnBuffer = new StringBuilder();

int numBytesRead = readStream.read(readBuffer);
while (numBytesRead > 0) {
returnBuffer.append(new String(readBuffer, 0, numBytesRead, UTF_8));
numBytesRead = readStream.read(readBuffer);
}

return returnBuffer.toString();
}

private void validateVectoredReadResult(List<FileRange> fileRanges, Path hadoopPath)
throws Exception {
for (FileRange range : fileRanges) {
Expand Down

0 comments on commit ea0c048

Please sign in to comment.