From ce806747038422a76e553e1c4fa236b782641674 Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Thu, 18 Jul 2024 12:31:19 -0500 Subject: [PATCH] Resolving review comments --- .../io/deephaven/parquet/base/ParquetUtils.java | 16 +++++++++++++--- .../io/deephaven/parquet/table/ParquetTools.java | 7 ++----- .../table/layout/ParquetMetadataFileLayout.java | 2 +- .../parquet/table/S3ParquetTestBase.java | 5 +++++ .../extensions/s3/S3SeekableByteChannel.java | 1 - .../extensions/s3/S3SeekableChannelProvider.java | 16 ++++++++++------ 6 files changed, 31 insertions(+), 16 deletions(-) diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetUtils.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetUtils.java index e308fa226be..7aa61277ca2 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetUtils.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetUtils.java @@ -49,16 +49,26 @@ public static String getPerFileMetadataKey(final String filePath) { return "deephaven_per_file_" + filePath.replace(File.separatorChar, '_'); } + /** + * This method verifies if the source points to a parquet file. Provided source can be a local file path or a URI. + * Also, it can point to a parquet file, metadata file or a directory. + */ + public static boolean isParquetFile(@NotNull final String source) { + return source.endsWith(PARQUET_FILE_EXTENSION); + } + /** * This method verifies if the source points to a metadata file. Provided source can be a local file path or a URI. * Also, it can point to a parquet file, metadata file or a directory. */ public static boolean isMetadataFile(@NotNull final String source) { - boolean ret = source.endsWith(METADATA_FILE_URI_SUFFIX) || source.endsWith(COMMON_METADATA_FILE_URI_SUFFIX); + if (source.endsWith(METADATA_FILE_URI_SUFFIX) || source.endsWith(COMMON_METADATA_FILE_URI_SUFFIX)) { + return true; + } if (File.separatorChar != URI_SEPARATOR_CHAR) { - ret = ret || source.endsWith(METADATA_FILE_SUFFIX) || source.endsWith(COMMON_METADATA_FILE_SUFFIX); + return source.endsWith(METADATA_FILE_SUFFIX) || source.endsWith(COMMON_METADATA_FILE_SUFFIX); } - return ret; + return false; } /** diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java index fca0d187aa9..19d8ff09c71 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java @@ -126,8 +126,8 @@ public static Table readTable(@NotNull final String source) { public static Table readTable( @NotNull final String source, @NotNull final ParquetInstructions readInstructions) { - final boolean isParquetFile = source.endsWith(PARQUET_FILE_EXTENSION); - final boolean isMetadataFile = ParquetUtils.isMetadataFile(source); + final boolean isParquetFile = ParquetUtils.isParquetFile(source); + final boolean isMetadataFile = !isParquetFile && ParquetUtils.isMetadataFile(source); final boolean isDirectory = !isParquetFile && !isMetadataFile; final URI sourceURI = convertToURI(source, isDirectory); if (readInstructions.getFileLayout().isPresent()) { @@ -1130,9 +1130,6 @@ private static Table readKeyValuePartitionedTable( MAX_PARTITIONING_LEVELS_INFERENCE, readInstructions, channelsProvider), readInstructions); } final TableDefinition tableDefinition = readInstructions.getTableDefinition().get(); - if (tableDefinition.getColumnStream().noneMatch(ColumnDefinition::isPartitioning)) { - throw new IllegalArgumentException("No partitioning columns"); - } return readTable(ParquetKeyValuePartitionedLayout.create(directoryUri, tableDefinition, readInstructions, channelsProvider), readInstructions); } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetMetadataFileLayout.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetMetadataFileLayout.java index 56b6d309570..4ec93d85a1b 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetMetadataFileLayout.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetMetadataFileLayout.java @@ -90,7 +90,7 @@ public static ParquetMetadataFileLayout create( @Nullable SeekableChannelsProvider channelsProvider) { final String path = source.getRawPath(); final boolean isMetadataFile = path.endsWith(METADATA_FILE_URI_SUFFIX); - final boolean isCommonMetadataFile = path.endsWith(COMMON_METADATA_FILE_URI_SUFFIX); + final boolean isCommonMetadataFile = !isMetadataFile && path.endsWith(COMMON_METADATA_FILE_URI_SUFFIX); final boolean isDirectory = !isMetadataFile && !isCommonMetadataFile; final URI directory = isDirectory ? source : source.resolve("."); final URI metadataFileURI = isMetadataFile ? source : directory.resolve(METADATA_FILE_NAME); diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/S3ParquetTestBase.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/S3ParquetTestBase.java index d2c4aee3e4d..17e99079e1c 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/S3ParquetTestBase.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/S3ParquetTestBase.java @@ -136,6 +136,11 @@ public final void readFlatPartitionedParquetDataAsKVPartitioned() final Table fromS3AsKV = ParquetTools.readTable(uri.toString(), readInstructions.withLayout(ParquetInstructions.ParquetFileLayout.KV_PARTITIONED)); assertTableEquals(expected, fromS3AsKV); + + // Read with definition without layout + final Table fromS3AsFlatWithDefinition = ParquetTools.readTable(uri.toString(), + readInstructions.withTableDefinition(expected.getDefinition())); + assertTableEquals(expected, fromS3AsFlatWithDefinition); } @Test diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableByteChannel.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableByteChannel.java index 16c9b1e5d23..6f0d6ffa057 100644 --- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableByteChannel.java +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableByteChannel.java @@ -4,7 +4,6 @@ package io.deephaven.extensions.s3; import io.deephaven.base.verify.Assert; -import io.deephaven.base.verify.Require; import io.deephaven.util.channel.CachedChannelProvider; import io.deephaven.util.channel.SeekableChannelContext; import org.jetbrains.annotations.NotNull; diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProvider.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProvider.java index 45a48aeb227..7083e1c22cf 100644 --- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProvider.java +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProvider.java @@ -54,6 +54,7 @@ final class S3SeekableChannelProvider implements SeekableChannelsProvider { private static final int MAX_KEYS_PER_BATCH = 1000; + private static final int UNKNOWN_SIZE = -1; private static final Logger log = LoggerFactory.getLogger(S3SeekableChannelProvider.class); @@ -81,7 +82,7 @@ final class S3SeekableChannelProvider implements SeekableChannelsProvider { @Override public boolean exists(@NotNull final URI uri) { - if (getCachedSize(uri) >= 0) { + if (getCachedSize(uri) != UNKNOWN_SIZE) { return true; } final S3Uri s3Uri = s3AsyncClient.utilities().parseUri(uri); @@ -101,7 +102,7 @@ public SeekableByteChannel getReadChannel(@NotNull final SeekableChannelContext final S3Uri s3Uri = s3AsyncClient.utilities().parseUri(uri); // context is unused here, will be set before reading from the channel final long cachedSize = getCachedSize(uri); - if (cachedSize >= 0) { + if (cachedSize != UNKNOWN_SIZE) { return new S3SeekableByteChannel(s3Uri, cachedSize); } return new S3SeekableByteChannel(s3Uri); @@ -261,12 +262,12 @@ private Map getFileSizeCache() { */ long fetchFileSize(@NotNull final S3Uri s3Uri) throws IOException { final long cachedSize = getCachedSize(s3Uri.uri()); - if (cachedSize >= 0) { + if (cachedSize != UNKNOWN_SIZE) { return cachedSize; } // Fetch the size of the file using a blocking HEAD request, and store it in the cache for future use if (log.isDebugEnabled()) { - log.debug().append("Head: ").endl(); + log.debug().append("Head: ").append(s3Uri.toString()).endl(); } final HeadObjectResponse headObjectResponse; try { @@ -285,7 +286,7 @@ long fetchFileSize(@NotNull final S3Uri s3Uri) throws IOException { } /** - * Get the cached size for the given URI, or -1 if the size is not cached. + * Get the cached size for the given URI, or {@value UNKNOWN_SIZE} if the size is not cached. */ private long getCachedSize(final URI uri) { final Map fileSizeCache = fileSizeCacheRef.get(); @@ -295,13 +296,16 @@ private long getCachedSize(final URI uri) { return sizeInfo.size; } } - return -1; + return UNKNOWN_SIZE; } /** * Cache the file size for the given URI. */ private void updateFileSizeCache(@NotNull final URI uri, final long size) { + if (size < 0) { + throw new IllegalArgumentException("Invalid file size: " + size + " for URI " + uri); + } final Map fileSizeCache = getFileSizeCache(); fileSizeCache.compute(uri, (key, existingInfo) -> { if (existingInfo == null) {