Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved threading capabilities of S3+parquet #5451

Merged
merged 11 commits into from
May 8, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import io.deephaven.time.DateTimeUtils;
import io.deephaven.util.QueryConstants;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.channel.SeekableChannelsProvider;
import io.deephaven.util.channel.SeekableChannelsProviderLoader;
import io.deephaven.vector.IntVector;
import io.deephaven.vector.ObjectVector;
import junit.framework.ComparisonFailure;
Expand All @@ -58,6 +60,7 @@
import java.lang.reflect.Array;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.net.URI;
import java.nio.file.Files;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -3889,9 +3892,13 @@ public void testMultiPartitionSymbolTableBy() throws IOException {
t3.updateView("Date=`2021-07-21`", "Num=300"),
t4.updateView("Date=`2021-07-21`", "Num=400")).moveColumnsUp("Date", "Num");

final Table loaded = ParquetTools.readPartitionedTableInferSchema(
new ParquetKeyValuePartitionedLayout(testRootFile, 2, ParquetInstructions.EMPTY),
ParquetInstructions.EMPTY);
final URI testRootUri = testRootFile.toURI();
final SeekableChannelsProvider provider = SeekableChannelsProviderLoader.getInstance().fromServiceLoader(
testRootUri, null);
final ParquetInstructions instructions = ParquetInstructions.EMPTY.withChannelsProvider(provider);
final Table loaded = ParquetTools.readTable(
new ParquetKeyValuePartitionedLayout(testRootFile, 2, instructions),
instructions);

// verify the sources are identical
assertTableEquals(merged, loaded);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,14 @@ public class ParquetFileReader {
* {@link UncheckedDeephavenException}.
*
* @param parquetFile The parquet file or the parquet metadata file
* @param specialInstructions Optional read instructions to pass to {@link SeekableChannelsProvider} while creating
* channels
* @param channelsProvider The {@link SeekableChannelsProvider} to use for reading the file
* @return The new {@link ParquetFileReader}
*/
public static ParquetFileReader create(
@NotNull final File parquetFile,
@Nullable final Object specialInstructions) {
final SeekableChannelsProvider channelsProvider) {
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
try {
return createChecked(parquetFile, specialInstructions);
return createChecked(parquetFile, channelsProvider);
} catch (IOException e) {
throw new UncheckedDeephavenException("Failed to create Parquet file reader: " + parquetFile, e);
}
Expand All @@ -66,15 +65,14 @@ public static ParquetFileReader create(
* {@link UncheckedDeephavenException}.
*
* @param parquetFileURI The URI for the parquet file or the parquet metadata file
* @param specialInstructions Optional read instructions to pass to {@link SeekableChannelsProvider} while creating
* channels
* @param channelsProvider The {@link SeekableChannelsProvider} to use for reading the file
* @return The new {@link ParquetFileReader}
*/
public static ParquetFileReader create(
@NotNull final URI parquetFileURI,
@Nullable final Object specialInstructions) {
@Nullable final SeekableChannelsProvider channelsProvider) {
try {
return createChecked(parquetFileURI, specialInstructions);
return createChecked(parquetFileURI, channelsProvider);
} catch (IOException e) {
throw new UncheckedDeephavenException("Failed to create Parquet file reader: " + parquetFileURI, e);
}
Expand All @@ -84,40 +82,39 @@ public static ParquetFileReader create(
* Make a {@link ParquetFileReader} for the supplied {@link File}.
*
* @param parquetFile The parquet file or the parquet metadata file
* @param specialInstructions Optional read instructions to pass to {@link SeekableChannelsProvider} while creating
* channels
* @param channelsProvider The {@link SeekableChannelsProvider} to use for reading the file
* @return The new {@link ParquetFileReader}
* @throws IOException if an IO exception occurs
*/
public static ParquetFileReader createChecked(
@NotNull final File parquetFile,
@Nullable final Object specialInstructions) throws IOException {
return createChecked(convertToURI(parquetFile, false), specialInstructions);
final SeekableChannelsProvider channelsProvider) throws IOException {
return createChecked(convertToURI(parquetFile, false), channelsProvider);
}

/**
* Make a {@link ParquetFileReader} for the supplied {@link URI}.
*
* @param parquetFileURI The URI for the parquet file or the parquet metadata file
* @param specialInstructions Optional read instructions to pass to {@link SeekableChannelsProvider} while creating
* channels
* @param channelsProvider The {@link SeekableChannelsProvider} to use for reading the file
* @return The new {@link ParquetFileReader}
* @throws IOException if an IO exception occurs
*/
public static ParquetFileReader createChecked(
@NotNull final URI parquetFileURI,
@Nullable final Object specialInstructions) throws IOException {
final SeekableChannelsProvider provider = SeekableChannelsProviderLoader.getInstance().fromServiceLoader(
parquetFileURI, specialInstructions);
return new ParquetFileReader(parquetFileURI, new CachedChannelProvider(provider, 1 << 7));
final SeekableChannelsProvider channelsProvider) throws IOException {
return new ParquetFileReader(parquetFileURI, new CachedChannelProvider(channelsProvider, 1 << 7));
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Create a new ParquetFileReader for the provided source.
*
* @param source The source path or URI for the parquet file or the parquet metadata file
* @param channelsProvider The {@link SeekableChannelsProvider} to use for reading the file
*
* @deprecated: Use {@link #createChecked(URI, SeekableChannelsProvider)} instead
*/
@Deprecated
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
public ParquetFileReader(final String source, final SeekableChannelsProvider channelsProvider)
throws IOException {
this(convertToURI(source, false), channelsProvider);
Expand All @@ -128,7 +125,10 @@ public ParquetFileReader(final String source, final SeekableChannelsProvider cha
*
* @param parquetFileURI The URI for the parquet file or the parquet metadata file
* @param channelsProvider The {@link SeekableChannelsProvider} to use for reading the file
*
* @deprecated: Use {@link #createChecked(URI, SeekableChannelsProvider)} instead
*/
@Deprecated
public ParquetFileReader(final URI parquetFileURI, final SeekableChannelsProvider channelsProvider)
throws IOException {
this.channelsProvider = channelsProvider;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.deephaven.hash.KeyedObjectKey;
import io.deephaven.parquet.base.ParquetUtils;
import io.deephaven.util.annotations.VisibleForTesting;
import io.deephaven.util.channel.SeekableChannelsProvider;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
Expand Down Expand Up @@ -218,6 +219,12 @@ public final String getColumnNameFromParquetColumnNameOrDefault(final String par

public abstract Optional<Collection<List<String>>> getIndexColumns();

/**
* @return a {@link SeekableChannelsProvider} that can be used to open channels for reading parquet files. This is
* an internal method and should not be used by clients.
*/
public abstract Optional<SeekableChannelsProvider> getChannelsProvider();
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved

/**
* Creates a new {@link ParquetInstructions} object with the same properties as the current object but definition
* set as the provided {@link TableDefinition}.
Expand All @@ -238,6 +245,13 @@ public abstract ParquetInstructions withTableDefinitionAndLayout(final TableDefi
@VisibleForTesting
abstract ParquetInstructions withIndexColumns(final Collection<List<String>> indexColumns);

/**
* Creates a new {@link ParquetInstructions} object with the same properties as the current object but with the
* provided {@link SeekableChannelsProvider}.
*/
@VisibleForTesting
public abstract ParquetInstructions withChannelsProvider(final SeekableChannelsProvider channelsProvider);

/**
* @return the base name for partitioned parquet data. Check
* {@link Builder#setBaseNameForPartitionedParquetData(String) setBaseNameForPartitionedParquetData} for
Expand Down Expand Up @@ -349,6 +363,11 @@ public Optional<Collection<List<String>>> getIndexColumns() {
return Optional.empty();
}

@Override
public Optional<SeekableChannelsProvider> getChannelsProvider() {
return Optional.empty();
}

@Override
public ParquetInstructions withTableDefinition(@Nullable final TableDefinition tableDefinition) {
return withTableDefinitionAndLayout(tableDefinition, null);
Expand All @@ -361,15 +380,23 @@ public ParquetInstructions withTableDefinitionAndLayout(
return new ReadOnly(null, null, getCompressionCodecName(), getMaximumDictionaryKeys(),
getMaximumDictionarySize(), isLegacyParquet(), getTargetPageSize(), isRefreshing(),
getSpecialInstructions(), generateMetadataFiles(), baseNameForPartitionedParquetData(),
fileLayout, tableDefinition, null);
fileLayout, tableDefinition, null, null);
}

@Override
ParquetInstructions withIndexColumns(final Collection<List<String>> indexColumns) {
return new ReadOnly(null, null, getCompressionCodecName(), getMaximumDictionaryKeys(),
getMaximumDictionarySize(), isLegacyParquet(), getTargetPageSize(), isRefreshing(),
getSpecialInstructions(), generateMetadataFiles(), baseNameForPartitionedParquetData(),
null, null, indexColumns);
null, null, indexColumns, null);
}

@Override
public ParquetInstructions withChannelsProvider(final SeekableChannelsProvider useChannelsProvider) {
return new ReadOnly(null, null, getCompressionCodecName(), getMaximumDictionaryKeys(),
getMaximumDictionarySize(), isLegacyParquet(), getTargetPageSize(), isRefreshing(),
getSpecialInstructions(), generateMetadataFiles(), baseNameForPartitionedParquetData(),
null, null, null, useChannelsProvider);
}
};

Expand Down Expand Up @@ -444,6 +471,7 @@ private static final class ReadOnly extends ParquetInstructions {
private final ParquetFileLayout fileLayout;
private final TableDefinition tableDefinition;
private final Collection<List<String>> indexColumns;
private final SeekableChannelsProvider channelsProvider;

private ReadOnly(
final KeyedObjectHashMap<String, ColumnInstructions> columnNameToInstructions,
Expand All @@ -459,7 +487,8 @@ private ReadOnly(
final String baseNameForPartitionedParquetData,
final ParquetFileLayout fileLayout,
final TableDefinition tableDefinition,
final Collection<List<String>> indexColumns) {
final Collection<List<String>> indexColumns,
final SeekableChannelsProvider channelsProvider) {
this.columnNameToInstructions = columnNameToInstructions;
this.parquetColumnNameToInstructions = parquetColumnNameToColumnName;
this.compressionCodecName = compressionCodecName;
Expand All @@ -477,6 +506,7 @@ private ReadOnly(
: indexColumns.stream()
.map(List::copyOf)
.collect(Collectors.toUnmodifiableList());
this.channelsProvider = channelsProvider;
}

private String getOrDefault(final String columnName, final String defaultValue,
Expand Down Expand Up @@ -596,6 +626,11 @@ public Optional<Collection<List<String>>> getIndexColumns() {
return Optional.ofNullable(indexColumns);
}

@Override
public Optional<SeekableChannelsProvider> getChannelsProvider() {
return Optional.ofNullable(channelsProvider);
}

@Override
public ParquetInstructions withTableDefinition(@Nullable final TableDefinition useDefinition) {
return withTableDefinitionAndLayout(useDefinition, getFileLayout().orElse(null));
Expand All @@ -609,7 +644,7 @@ public ParquetInstructions withTableDefinitionAndLayout(
getCompressionCodecName(), getMaximumDictionaryKeys(), getMaximumDictionarySize(),
isLegacyParquet(), getTargetPageSize(), isRefreshing(), getSpecialInstructions(),
generateMetadataFiles(), baseNameForPartitionedParquetData(), useLayout, useDefinition,
indexColumns);
indexColumns, channelsProvider);
}

@Override
Expand All @@ -618,7 +653,16 @@ ParquetInstructions withIndexColumns(final Collection<List<String>> useIndexColu
getCompressionCodecName(), getMaximumDictionaryKeys(), getMaximumDictionarySize(),
isLegacyParquet(), getTargetPageSize(), isRefreshing(), getSpecialInstructions(),
generateMetadataFiles(), baseNameForPartitionedParquetData(), fileLayout,
tableDefinition, useIndexColumns);
tableDefinition, useIndexColumns, channelsProvider);
}

@Override
public ParquetInstructions withChannelsProvider(final SeekableChannelsProvider useChannelsProvider) {
return new ReadOnly(columnNameToInstructions, parquetColumnNameToInstructions,
getCompressionCodecName(), getMaximumDictionaryKeys(), getMaximumDictionarySize(),
isLegacyParquet(), getTargetPageSize(), isRefreshing(), getSpecialInstructions(),
generateMetadataFiles(), baseNameForPartitionedParquetData(), fileLayout,
tableDefinition, indexColumns, useChannelsProvider);
}

KeyedObjectHashMap<String, ColumnInstructions> copyColumnNameToInstructions() {
Expand Down Expand Up @@ -677,21 +721,30 @@ public static class Builder {
private ParquetFileLayout fileLayout;
private TableDefinition tableDefinition;
private Collection<List<String>> indexColumns;
private SeekableChannelsProvider channelsProvider;

public Builder() {}

/**
* Creates a new {@link ParquetInstructions} object by only copying the column name to instructions mapping and
* parquet column name to instructions mapping from the given {@link ParquetInstructions} object. For copying
* all properties, use something like {@link ParquetInstructions#withTableDefinition}.
*/
public Builder(final ParquetInstructions parquetInstructions) {
if (parquetInstructions == EMPTY) {
return;
}
final ReadOnly readOnlyParquetInstructions = (ReadOnly) parquetInstructions;
columnNameToInstructions = readOnlyParquetInstructions.copyColumnNameToInstructions();
parquetColumnNameToInstructions = readOnlyParquetInstructions.copyParquetColumnNameToInstructions();
compressionCodecName = readOnlyParquetInstructions.getCompressionCodecName();
maximumDictionaryKeys = readOnlyParquetInstructions.getMaximumDictionaryKeys();
maximumDictionarySize = readOnlyParquetInstructions.getMaximumDictionarySize();
isLegacyParquet = readOnlyParquetInstructions.isLegacyParquet();
targetPageSize = readOnlyParquetInstructions.getTargetPageSize();
isRefreshing = readOnlyParquetInstructions.isRefreshing();
specialInstructions = readOnlyParquetInstructions.getSpecialInstructions();
generateMetadataFiles = readOnlyParquetInstructions.generateMetadataFiles();
baseNameForPartitionedParquetData = readOnlyParquetInstructions.baseNameForPartitionedParquetData();
fileLayout = readOnlyParquetInstructions.getFileLayout().orElse(null);
tableDefinition = readOnlyParquetInstructions.getTableDefinition().orElse(null);
indexColumns = readOnlyParquetInstructions.getIndexColumns().orElse(null);
channelsProvider = readOnlyParquetInstructions.getChannelsProvider().orElse(null);
}

private void newColumnNameToInstructionsMap() {
Expand Down Expand Up @@ -968,7 +1021,7 @@ public ParquetInstructions build() {
return new ReadOnly(columnNameToInstructionsOut, parquetColumnNameToColumnNameOut, compressionCodecName,
maximumDictionaryKeys, maximumDictionarySize, isLegacyParquet, targetPageSize, isRefreshing,
specialInstructions, generateMetadataFiles, baseNameForPartitionedParquetData, fileLayout,
tableDefinition, indexColumns);
tableDefinition, indexColumns, channelsProvider);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@
import org.apache.parquet.schema.PrimitiveType;
import org.jetbrains.annotations.NotNull;

import java.io.File;
import java.io.IOException;
import java.math.BigInteger;
import java.net.URI;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
Expand All @@ -45,7 +45,9 @@
import java.util.function.BiFunction;
import java.util.function.Supplier;

import static io.deephaven.base.FileUtils.convertToURI;
import static io.deephaven.parquet.base.ParquetUtils.METADATA_KEY;
import static io.deephaven.parquet.table.ParquetTools.ensureChannelsProvider;

public class ParquetSchemaReader {
@FunctionalInterface
Expand Down Expand Up @@ -105,14 +107,19 @@ void reset() {
* @param consumer A ColumnDefinitionConsumer whose accept method would be called for each column in the file
* @return Parquet read instructions, either the ones supplied or a new object based on the supplied with necessary
* transformations added.
*
* @deprecated Unused method
*/
@Deprecated
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
public static ParquetInstructions readParquetSchema(
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
@NotNull final String filePath,
@NotNull final ParquetInstructions readInstructions,
@NotNull final ColumnDefinitionConsumer consumer,
@NotNull final BiFunction<String, Set<String>, String> legalizeColumnNameFunc) throws IOException {
final URI parquetFileUri = convertToURI(filePath, false);
final ParquetInstructions useInstructions = ensureChannelsProvider(parquetFileUri, readInstructions);
final ParquetFileReader parquetFileReader =
ParquetFileReader.createChecked(new File(filePath), readInstructions.getSpecialInstructions());
ParquetFileReader.createChecked(parquetFileUri, useInstructions.getChannelsProvider().orElseThrow());
final ParquetMetadata parquetMetadata =
new ParquetMetadataConverter().fromParquetMetadata(parquetFileReader.fileMetaData);
return readParquetSchema(parquetFileReader.getSchema(), parquetMetadata.getFileMetaData().getKeyValueMetaData(),
Expand Down
Loading
Loading