From 1fc326788f74ee0590ed67ed957facd12fd44035 Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Fri, 1 Nov 2024 21:45:10 +0530 Subject: [PATCH] Support reading S3 bucket root URI --- .../java/io/deephaven/base/FileUtils.java | 2 +- .../java/io/deephaven/base/FileUtilsTest.java | 3 + .../parquet/table/S3ParquetTestBase.java | 62 +++++++++++++++++++ .../s3/S3SeekableChannelProvider.java | 6 +- 4 files changed, 70 insertions(+), 3 deletions(-) diff --git a/Base/src/main/java/io/deephaven/base/FileUtils.java b/Base/src/main/java/io/deephaven/base/FileUtils.java index e55489bc75f..820e733c0a9 100644 --- a/Base/src/main/java/io/deephaven/base/FileUtils.java +++ b/Base/src/main/java/io/deephaven/base/FileUtils.java @@ -282,7 +282,7 @@ public static URI convertToURI(final String source, final boolean isDirectory) { return convertToURI(new File(uri), isDirectory); } String path = uri.getPath(); - final boolean endsWithSlash = path.charAt(path.length() - 1) == URI_SEPARATOR_CHAR; + final boolean endsWithSlash = !path.isEmpty() && path.charAt(path.length() - 1) == URI_SEPARATOR_CHAR; if (!isDirectory && endsWithSlash) { throw new IllegalArgumentException("Non-directory URI should not end with a slash: " + uri); } diff --git a/Base/src/test/java/io/deephaven/base/FileUtilsTest.java b/Base/src/test/java/io/deephaven/base/FileUtilsTest.java index fc00e5802cb..3cf4c7433d8 100644 --- a/Base/src/test/java/io/deephaven/base/FileUtilsTest.java +++ b/Base/src/test/java/io/deephaven/base/FileUtilsTest.java @@ -53,6 +53,9 @@ public void testConvertToS3URI() throws URISyntaxException { // Check if multiple slashes get normalized Assert.assertEquals("s3://bucket/key/", FileUtils.convertToURI("s3://bucket///key///", true).toString()); + // Check if trailing slash gets added to bucket root + Assert.assertEquals("s3://bucket/", FileUtils.convertToURI("s3://bucket", true).toString()); + try { FileUtils.convertToURI("", false); Assert.fail("Expected IllegalArgumentException"); diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/S3ParquetTestBase.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/S3ParquetTestBase.java index 7086fad8312..7c0bd5d9369 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/S3ParquetTestBase.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/S3ParquetTestBase.java @@ -587,4 +587,66 @@ public void testReadWriteUsingProfile() throws IOException { Files.delete(tempCredentialsFile); } } + + @Test + public void testReadBucketRootKeyValuePartitioned() { + final TableDefinition definition = TableDefinition.of( + ColumnDefinition.ofInt("PC1").withPartitioning(), + ColumnDefinition.ofInt("PC2").withPartitioning(), + ColumnDefinition.ofInt("someIntColumn"), + ColumnDefinition.ofString("someStringColumn")); + final Table table = ((QueryTable) TableTools.emptyTable(500_000) + .updateView("PC1 = (int)(ii%3)", + "PC2 = (int)(ii%2)", + "someIntColumn = (int) i", + "someStringColumn = String.valueOf(i)")) + .withDefinitionUnsafe(definition); + final URI bucketRoot = URI.create(String.format("s3://%s", bucket)); + final ParquetInstructions instructions = ParquetInstructions.builder() + .setSpecialInstructions(s3Instructions( + S3Instructions.builder() + .readTimeout(Duration.ofSeconds(10))) + .build()) + .setTableDefinition(definition) + .setBaseNameForPartitionedParquetData("data") + .build(); + writeKeyValuePartitionedTable(table, bucketRoot.toString(), instructions); + { + final Table fromS3 = ParquetTools.readTable(bucketRoot.toString(), instructions); + assertTableEquals(table.sort("PC1", "PC2"), fromS3.sort("PC1", "PC2")); + } + { + final URI bucketRootWithSlash = URI.create(String.format("s3://%s/", bucket)); + final Table fromS3 = ParquetTools.readTable(bucketRootWithSlash.toString(), instructions); + assertTableEquals(table.sort("PC1", "PC2"), fromS3.sort("PC1", "PC2")); + } + } + + @Test + public void testReadBucketRootFlatPartitioned() { + final Table table = getTable(100_000); + final ParquetInstructions instructions = ParquetInstructions.builder() + .setSpecialInstructions(s3Instructions( + S3Instructions.builder() + .readTimeout(Duration.ofSeconds(10))) + .build()) + .build(); + for (int i = 0; i < 3; ++i) { + final URI dest = uri("table" + i + ".parquet"); + ParquetTools.writeTable(table, dest.toString(), instructions); + } + + final URI bucketRoot = URI.create(String.format("s3://%s", bucket)); + final Table expected = merge(table, table, table); + { + final Table fromS3AsFlat = ParquetTools.readTable(bucketRoot.toString(), + instructions.withLayout(ParquetInstructions.ParquetFileLayout.FLAT_PARTITIONED)); + assertTableEquals(expected, fromS3AsFlat); + } + { + final Table fromS3AsKV = ParquetTools.readTable(bucketRoot.toString(), + instructions.withLayout(ParquetInstructions.ParquetFileLayout.KV_PARTITIONED)); + assertTableEquals(expected, fromS3AsKV); + } + } } diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProvider.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProvider.java index d7f59de2418..477c2031831 100644 --- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProvider.java +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProvider.java @@ -174,7 +174,7 @@ Stream createStream( { final S3Uri s3DirectoryURI = s3AsyncClient.utilities().parseUri(directory); bucketName = s3DirectoryURI.bucket().orElseThrow(); - directoryKey = s3DirectoryURI.key().orElseThrow(); + directoryKey = s3DirectoryURI.key().orElse(""); // Empty string for the bucket root } @Override @@ -209,8 +209,10 @@ public URI next() { private void fetchNextBatch() throws IOException { final ListObjectsV2Request.Builder requestBuilder = ListObjectsV2Request.builder() .bucket(bucketName) - .prefix(directoryKey) .maxKeys(MAX_KEYS_PER_BATCH); + if (!directoryKey.isEmpty()) { + requestBuilder.prefix(directoryKey); + } if (!isRecursive) { // Add a delimiter to the request if we don't want to fetch all files recursively requestBuilder.delimiter("/");