Skip to content

Commit

Permalink
Resolving review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
malhotrashivam committed Jul 18, 2024
1 parent 027d381 commit ce80674
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,26 @@ public static String getPerFileMetadataKey(final String filePath) {
return "deephaven_per_file_" + filePath.replace(File.separatorChar, '_');
}

/**
* This method verifies if the source points to a parquet file. Provided source can be a local file path or a URI.
* Also, it can point to a parquet file, metadata file or a directory.
*/
public static boolean isParquetFile(@NotNull final String source) {
return source.endsWith(PARQUET_FILE_EXTENSION);
}

/**
* This method verifies if the source points to a metadata file. Provided source can be a local file path or a URI.
* Also, it can point to a parquet file, metadata file or a directory.
*/
public static boolean isMetadataFile(@NotNull final String source) {
boolean ret = source.endsWith(METADATA_FILE_URI_SUFFIX) || source.endsWith(COMMON_METADATA_FILE_URI_SUFFIX);
if (source.endsWith(METADATA_FILE_URI_SUFFIX) || source.endsWith(COMMON_METADATA_FILE_URI_SUFFIX)) {
return true;
}
if (File.separatorChar != URI_SEPARATOR_CHAR) {
ret = ret || source.endsWith(METADATA_FILE_SUFFIX) || source.endsWith(COMMON_METADATA_FILE_SUFFIX);
return source.endsWith(METADATA_FILE_SUFFIX) || source.endsWith(COMMON_METADATA_FILE_SUFFIX);
}
return ret;
return false;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ public static Table readTable(@NotNull final String source) {
public static Table readTable(
@NotNull final String source,
@NotNull final ParquetInstructions readInstructions) {
final boolean isParquetFile = source.endsWith(PARQUET_FILE_EXTENSION);
final boolean isMetadataFile = ParquetUtils.isMetadataFile(source);
final boolean isParquetFile = ParquetUtils.isParquetFile(source);
final boolean isMetadataFile = !isParquetFile && ParquetUtils.isMetadataFile(source);
final boolean isDirectory = !isParquetFile && !isMetadataFile;
final URI sourceURI = convertToURI(source, isDirectory);
if (readInstructions.getFileLayout().isPresent()) {
Expand Down Expand Up @@ -1130,9 +1130,6 @@ private static Table readKeyValuePartitionedTable(
MAX_PARTITIONING_LEVELS_INFERENCE, readInstructions, channelsProvider), readInstructions);
}
final TableDefinition tableDefinition = readInstructions.getTableDefinition().get();
if (tableDefinition.getColumnStream().noneMatch(ColumnDefinition::isPartitioning)) {
throw new IllegalArgumentException("No partitioning columns");
}
return readTable(ParquetKeyValuePartitionedLayout.create(directoryUri, tableDefinition,
readInstructions, channelsProvider), readInstructions);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public static ParquetMetadataFileLayout create(
@Nullable SeekableChannelsProvider channelsProvider) {
final String path = source.getRawPath();
final boolean isMetadataFile = path.endsWith(METADATA_FILE_URI_SUFFIX);
final boolean isCommonMetadataFile = path.endsWith(COMMON_METADATA_FILE_URI_SUFFIX);
final boolean isCommonMetadataFile = !isMetadataFile && path.endsWith(COMMON_METADATA_FILE_URI_SUFFIX);
final boolean isDirectory = !isMetadataFile && !isCommonMetadataFile;
final URI directory = isDirectory ? source : source.resolve(".");
final URI metadataFileURI = isMetadataFile ? source : directory.resolve(METADATA_FILE_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,11 @@ public final void readFlatPartitionedParquetDataAsKVPartitioned()
final Table fromS3AsKV = ParquetTools.readTable(uri.toString(),
readInstructions.withLayout(ParquetInstructions.ParquetFileLayout.KV_PARTITIONED));
assertTableEquals(expected, fromS3AsKV);

// Read with definition without layout
final Table fromS3AsFlatWithDefinition = ParquetTools.readTable(uri.toString(),
readInstructions.withTableDefinition(expected.getDefinition()));
assertTableEquals(expected, fromS3AsFlatWithDefinition);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package io.deephaven.extensions.s3;

import io.deephaven.base.verify.Assert;
import io.deephaven.base.verify.Require;
import io.deephaven.util.channel.CachedChannelProvider;
import io.deephaven.util.channel.SeekableChannelContext;
import org.jetbrains.annotations.NotNull;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
final class S3SeekableChannelProvider implements SeekableChannelsProvider {

private static final int MAX_KEYS_PER_BATCH = 1000;
private static final int UNKNOWN_SIZE = -1;

private static final Logger log = LoggerFactory.getLogger(S3SeekableChannelProvider.class);

Expand Down Expand Up @@ -81,7 +82,7 @@ final class S3SeekableChannelProvider implements SeekableChannelsProvider {

@Override
public boolean exists(@NotNull final URI uri) {
if (getCachedSize(uri) >= 0) {
if (getCachedSize(uri) != UNKNOWN_SIZE) {
return true;
}
final S3Uri s3Uri = s3AsyncClient.utilities().parseUri(uri);
Expand All @@ -101,7 +102,7 @@ public SeekableByteChannel getReadChannel(@NotNull final SeekableChannelContext
final S3Uri s3Uri = s3AsyncClient.utilities().parseUri(uri);
// context is unused here, will be set before reading from the channel
final long cachedSize = getCachedSize(uri);
if (cachedSize >= 0) {
if (cachedSize != UNKNOWN_SIZE) {
return new S3SeekableByteChannel(s3Uri, cachedSize);
}
return new S3SeekableByteChannel(s3Uri);
Expand Down Expand Up @@ -261,12 +262,12 @@ private Map<URI, FileSizeInfo> getFileSizeCache() {
*/
long fetchFileSize(@NotNull final S3Uri s3Uri) throws IOException {
final long cachedSize = getCachedSize(s3Uri.uri());
if (cachedSize >= 0) {
if (cachedSize != UNKNOWN_SIZE) {
return cachedSize;
}
// Fetch the size of the file using a blocking HEAD request, and store it in the cache for future use
if (log.isDebugEnabled()) {
log.debug().append("Head: ").endl();
log.debug().append("Head: ").append(s3Uri.toString()).endl();
}
final HeadObjectResponse headObjectResponse;
try {
Expand All @@ -285,7 +286,7 @@ long fetchFileSize(@NotNull final S3Uri s3Uri) throws IOException {
}

/**
* Get the cached size for the given URI, or -1 if the size is not cached.
* Get the cached size for the given URI, or {@value UNKNOWN_SIZE} if the size is not cached.
*/
private long getCachedSize(final URI uri) {
final Map<URI, FileSizeInfo> fileSizeCache = fileSizeCacheRef.get();
Expand All @@ -295,13 +296,16 @@ private long getCachedSize(final URI uri) {
return sizeInfo.size;
}
}
return -1;
return UNKNOWN_SIZE;
}

/**
* Cache the file size for the given URI.
*/
private void updateFileSizeCache(@NotNull final URI uri, final long size) {
if (size < 0) {
throw new IllegalArgumentException("Invalid file size: " + size + " for URI " + uri);
}
final Map<URI, FileSizeInfo> fileSizeCache = getFileSizeCache();
fileSizeCache.compute(uri, (key, existingInfo) -> {
if (existingInfo == null) {
Expand Down

0 comments on commit ce80674

Please sign in to comment.