Skip to content

Commit

Permalink
Improved threading capabilities of S3+parquet (#5451)
Browse files Browse the repository at this point in the history
  • Loading branch information
malhotrashivam authored May 8, 2024
1 parent 25e0cb1 commit 2dbbf32
Show file tree
Hide file tree
Showing 24 changed files with 496 additions and 326 deletions.
1 change: 0 additions & 1 deletion Base/src/main/java/io/deephaven/base/FileUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
//
package io.deephaven.base;

import io.deephaven.base.verify.Assert;
import io.deephaven.base.verify.Require;
import org.jetbrains.annotations.Nullable;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,15 @@ enum ChannelType {
private final RAPriQueue<PerPathPool> releasePriority =
new RAPriQueue<>(8, PerPathPool.RAPQ_ADAPTER, PerPathPool.class);

public CachedChannelProvider(@NotNull final SeekableChannelsProvider wrappedProvider,
public static CachedChannelProvider create(@NotNull final SeekableChannelsProvider wrappedProvider,
final int maximumPooledCount) {
if (wrappedProvider instanceof CachedChannelProvider) {
throw new IllegalArgumentException("Cannot wrap a CachedChannelProvider in another CachedChannelProvider");
}
return new CachedChannelProvider(wrappedProvider, maximumPooledCount);
}

private CachedChannelProvider(@NotNull final SeekableChannelsProvider wrappedProvider,
final int maximumPooledCount) {
this.wrappedProvider = wrappedProvider;
this.maximumPooledCount = Require.gtZero(maximumPooledCount, "maximumPooledCount");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,19 @@ private SeekableChannelsProviderLoader() {
}

/**
* Create a new {@link SeekableChannelsProvider} based on given URI and object using the plugins loaded by the
* {@link ServiceLoader}. For example, for a "S3" URI, we will create a {@link SeekableChannelsProvider} which can
* read files from S3.
* Create a new {@link SeekableChannelsProvider} compatible for reading from and writing to the given URI, using the
* plugins loaded by the {@link ServiceLoader}. For example, for a "S3" URI, we will create a
* {@link SeekableChannelsProvider} which can read files from S3.
*
* @param uri The URI
* @param object An optional object to pass to the {@link SeekableChannelsProviderPlugin} implementations.
* @param specialInstructions An optional object to pass special instructions to the provider.
* @return A {@link SeekableChannelsProvider} for the given URI.
*/
public SeekableChannelsProvider fromServiceLoader(@NotNull final URI uri, @Nullable final Object object) {
public SeekableChannelsProvider fromServiceLoader(@NotNull final URI uri,
@Nullable final Object specialInstructions) {
for (final SeekableChannelsProviderPlugin plugin : providers) {
if (plugin.isCompatible(uri, object)) {
return plugin.createProvider(uri, object);
if (plugin.isCompatible(uri, specialInstructions)) {
return plugin.createProvider(uri, specialInstructions);
}
}
throw new UnsupportedOperationException("No plugin found for uri: " + uri);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.function.Supplier;
import java.util.stream.Stream;

import static org.junit.jupiter.api.Assertions.fail;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
Expand All @@ -32,7 +33,7 @@ public class CachedChannelProviderTest {
@Test
public void testSimpleRead() throws IOException {
final SeekableChannelsProvider wrappedProvider = new TestChannelProvider();
final CachedChannelProvider cachedChannelProvider = new CachedChannelProvider(wrappedProvider, 100);
final CachedChannelProvider cachedChannelProvider = CachedChannelProvider.create(wrappedProvider, 100);
for (int ii = 0; ii < 100; ++ii) {
final SeekableByteChannel[] sameFile = new SeekableByteChannel[10];
for (int jj = 0; jj < sameFile.length; ++jj) {
Expand All @@ -55,7 +56,7 @@ public void testSimpleRead() throws IOException {
@Test
public void testSimpleReadWrite() throws IOException {
SeekableChannelsProvider wrappedProvider = new TestChannelProvider();
CachedChannelProvider cachedChannelProvider = new CachedChannelProvider(wrappedProvider, 100);
CachedChannelProvider cachedChannelProvider = CachedChannelProvider.create(wrappedProvider, 100);
for (int i = 0; i < 1000; i++) {
SeekableByteChannel rc =
((i / 100) % 2 == 0 ? cachedChannelProvider.getReadChannel(wrappedProvider.makeContext(), "r" + i)
Expand All @@ -69,7 +70,7 @@ public void testSimpleReadWrite() throws IOException {
@Test
public void testSimpleWrite() throws IOException {
SeekableChannelsProvider wrappedProvider = new TestChannelProvider();
CachedChannelProvider cachedChannelProvider = new CachedChannelProvider(wrappedProvider, 100);
CachedChannelProvider cachedChannelProvider = CachedChannelProvider.create(wrappedProvider, 100);
for (int i = 0; i < 1000; i++) {
SeekableByteChannel rc = cachedChannelProvider.getWriteChannel("w" + i, false);
// Call write to hit the assertions inside the mock channel
Expand All @@ -86,7 +87,7 @@ public void testSimpleWrite() throws IOException {
@Test
public void testSimpleAppend() throws IOException {
SeekableChannelsProvider wrappedProvider = new TestChannelProvider();
CachedChannelProvider cachedChannelProvider = new CachedChannelProvider(wrappedProvider, 100);
CachedChannelProvider cachedChannelProvider = CachedChannelProvider.create(wrappedProvider, 100);
for (int i = 0; i < 1000; i++) {
SeekableByteChannel rc = cachedChannelProvider.getWriteChannel("a" + i, true);
rc.close();
Expand All @@ -100,7 +101,7 @@ public void testSimpleAppend() throws IOException {
@Test
public void testCloseOrder() throws IOException {
SeekableChannelsProvider wrappedProvider = new TestChannelProvider();
CachedChannelProvider cachedChannelProvider = new CachedChannelProvider(wrappedProvider, 100);
CachedChannelProvider cachedChannelProvider = CachedChannelProvider.create(wrappedProvider, 100);
for (int i = 0; i < 20; i++) {
List<SeekableByteChannel> channels = new ArrayList<>();
for (int j = 0; j < 50; j++) {
Expand All @@ -121,7 +122,7 @@ public void testCloseOrder() throws IOException {
@Test
public void testReuse() throws IOException {
final SeekableChannelsProvider wrappedProvider = new TestChannelProvider();
final CachedChannelProvider cachedChannelProvider = new CachedChannelProvider(wrappedProvider, 50);
final CachedChannelProvider cachedChannelProvider = CachedChannelProvider.create(wrappedProvider, 50);
final SeekableByteChannel[] someResult = new SeekableByteChannel[50];
final ByteBuffer buffer = ByteBuffer.allocate(1);
for (int ci = 0; ci < someResult.length; ++ci) {
Expand Down Expand Up @@ -149,7 +150,7 @@ public void testReuse() throws IOException {
@Test
public void testReuse10() throws IOException {
final SeekableChannelsProvider wrappedProvider = new TestChannelProvider();
final CachedChannelProvider cachedChannelProvider = new CachedChannelProvider(wrappedProvider, 100);
final CachedChannelProvider cachedChannelProvider = CachedChannelProvider.create(wrappedProvider, 100);
final SeekableByteChannel[] someResult = new SeekableByteChannel[100];
for (int pi = 0; pi < 10; ++pi) {
for (int ci = 0; ci < 10; ++ci) {
Expand All @@ -173,6 +174,17 @@ public void testReuse10() throws IOException {
assertEquals(0, closed.size());
}

@Test
void testRewrapCachedChannelProvider() {
final SeekableChannelsProvider wrappedProvider = new TestChannelProvider();
final CachedChannelProvider cachedChannelProvider = CachedChannelProvider.create(wrappedProvider, 100);
try {
CachedChannelProvider.create(cachedChannelProvider, 100);
fail("Expected IllegalArgumentException on rewrapping CachedChannelProvider");
} catch (final IllegalArgumentException expected) {
}
}


private class TestChannelProvider implements SeekableChannelsProvider {

Expand Down
26 changes: 26 additions & 0 deletions Util/src/main/java/io/deephaven/util/thread/ThreadHelpers.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.util.thread;

import io.deephaven.configuration.Configuration;

public class ThreadHelpers {
/**
* Get the number of threads to use for a given configuration key, defaulting to the number of available processors
* if the configuration key is set to a non-positive value, or the configuration key is not set and the provided
* default is non-positive.
*
* @param configKey The configuration key to look up
* @param defaultValue The default value to use if the configuration key is not set
* @return The number of threads to use
*/
public static int getOrComputeThreadCountProperty(final String configKey, final int defaultValue) {
final int numThreads = Configuration.getInstance().getIntegerWithDefault(configKey, defaultValue);
if (numThreads <= 0) {
return Runtime.getRuntime().availableProcessors();
} else {
return numThreads;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package io.deephaven.engine.table.impl;

import io.deephaven.chunk.util.pools.MultiChunkPool;
import io.deephaven.configuration.Configuration;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.updategraph.OperationInitializer;
import io.deephaven.util.thread.NamingThreadFactory;
Expand All @@ -17,6 +16,8 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import static io.deephaven.util.thread.ThreadHelpers.getOrComputeThreadCountProperty;

/**
* Implementation of OperationInitializer that delegates to a pool of threads.
*/
Expand All @@ -25,17 +26,8 @@ public class OperationInitializationThreadPool implements OperationInitializer {
/**
* The number of threads that will be used for parallel initialization in this process
*/
public static final int NUM_THREADS;

static {
final int numThreads =
Configuration.getInstance().getIntegerWithDefault("OperationInitializationThreadPool.threads", -1);
if (numThreads <= 0) {
NUM_THREADS = Runtime.getRuntime().availableProcessors();
} else {
NUM_THREADS = numThreads;
}
}
private static final int NUM_THREADS =
getOrComputeThreadCountProperty("OperationInitializationThreadPool.threads", -1);
private final ThreadLocal<Boolean> isInitializationThread = ThreadLocal.withInitial(() -> false);

private final ThreadPoolExecutor executorService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3890,7 +3890,7 @@ public void testMultiPartitionSymbolTableBy() throws IOException {
t4.updateView("Date=`2021-07-21`", "Num=400")).moveColumnsUp("Date", "Num");

final Table loaded = ParquetTools.readPartitionedTableInferSchema(
new ParquetKeyValuePartitionedLayout(testRootFile, 2, ParquetInstructions.EMPTY),
new ParquetKeyValuePartitionedLayout(testRootFile.toURI(), 2, ParquetInstructions.EMPTY),
ParquetInstructions.EMPTY);

// verify the sources are identical
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,19 @@
//
package io.deephaven.parquet.base;

import io.deephaven.UncheckedDeephavenException;
import io.deephaven.util.channel.CachedChannelProvider;
import io.deephaven.util.channel.SeekableChannelContext;
import io.deephaven.util.channel.SeekableChannelsProvider;
import io.deephaven.util.channel.SeekableChannelsProviderLoader;
import org.apache.parquet.format.*;
import org.apache.parquet.format.ColumnOrder;
import org.apache.parquet.format.Type;
import org.apache.parquet.schema.*;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.net.URI;
import java.nio.channels.SeekableByteChannel;
import java.util.*;
Expand All @@ -44,94 +42,50 @@ public class ParquetFileReader {

/**
* Make a {@link ParquetFileReader} for the supplied {@link File}. Wraps {@link IOException} as
* {@link UncheckedDeephavenException}.
* {@link UncheckedIOException}.
*
* @param parquetFile The parquet file or the parquet metadata file
* @param specialInstructions Optional read instructions to pass to {@link SeekableChannelsProvider} while creating
* channels
* @param channelsProvider The {@link SeekableChannelsProvider} to use for reading the file
* @return The new {@link ParquetFileReader}
*/
public static ParquetFileReader create(
@NotNull final File parquetFile,
@Nullable final Object specialInstructions) {
@NotNull final SeekableChannelsProvider channelsProvider) {
try {
return createChecked(parquetFile, specialInstructions);
} catch (IOException e) {
throw new UncheckedDeephavenException("Failed to create Parquet file reader: " + parquetFile, e);
return new ParquetFileReader(convertToURI(parquetFile, false), channelsProvider);
} catch (final IOException e) {
throw new UncheckedIOException("Failed to create Parquet file reader: " + parquetFile, e);
}
}

/**
* Make a {@link ParquetFileReader} for the supplied {@link URI}. Wraps {@link IOException} as
* {@link UncheckedDeephavenException}.
* {@link UncheckedIOException}.
*
* @param parquetFileURI The URI for the parquet file or the parquet metadata file
* @param specialInstructions Optional read instructions to pass to {@link SeekableChannelsProvider} while creating
* channels
* @param channelsProvider The {@link SeekableChannelsProvider} to use for reading the file
* @return The new {@link ParquetFileReader}
*/
public static ParquetFileReader create(
@NotNull final URI parquetFileURI,
@Nullable final Object specialInstructions) {
@NotNull final SeekableChannelsProvider channelsProvider) {
try {
return createChecked(parquetFileURI, specialInstructions);
} catch (IOException e) {
throw new UncheckedDeephavenException("Failed to create Parquet file reader: " + parquetFileURI, e);
return new ParquetFileReader(parquetFileURI, channelsProvider);
} catch (final IOException e) {
throw new UncheckedIOException("Failed to create Parquet file reader: " + parquetFileURI, e);
}
}

/**
* Make a {@link ParquetFileReader} for the supplied {@link File}.
*
* @param parquetFile The parquet file or the parquet metadata file
* @param specialInstructions Optional read instructions to pass to {@link SeekableChannelsProvider} while creating
* channels
* @return The new {@link ParquetFileReader}
* @throws IOException if an IO exception occurs
*/
public static ParquetFileReader createChecked(
@NotNull final File parquetFile,
@Nullable final Object specialInstructions) throws IOException {
return createChecked(convertToURI(parquetFile, false), specialInstructions);
}

/**
* Make a {@link ParquetFileReader} for the supplied {@link URI}.
*
* @param parquetFileURI The URI for the parquet file or the parquet metadata file
* @param specialInstructions Optional read instructions to pass to {@link SeekableChannelsProvider} while creating
* channels
* @return The new {@link ParquetFileReader}
* @throws IOException if an IO exception occurs
*/
public static ParquetFileReader createChecked(
@NotNull final URI parquetFileURI,
@Nullable final Object specialInstructions) throws IOException {
final SeekableChannelsProvider provider = SeekableChannelsProviderLoader.getInstance().fromServiceLoader(
parquetFileURI, specialInstructions);
return new ParquetFileReader(parquetFileURI, new CachedChannelProvider(provider, 1 << 7));
}

/**
* Create a new ParquetFileReader for the provided source.
*
* @param source The source path or URI for the parquet file or the parquet metadata file
* @param channelsProvider The {@link SeekableChannelsProvider} to use for reading the file
*/
public ParquetFileReader(final String source, final SeekableChannelsProvider channelsProvider)
throws IOException {
this(convertToURI(source, false), channelsProvider);
}

/**
* Create a new ParquetFileReader for the provided source.
*
* @param parquetFileURI The URI for the parquet file or the parquet metadata file
* @param channelsProvider The {@link SeekableChannelsProvider} to use for reading the file
* @param provider The {@link SeekableChannelsProvider} to use for reading the file
*/
public ParquetFileReader(final URI parquetFileURI, final SeekableChannelsProvider channelsProvider)
throws IOException {
this.channelsProvider = channelsProvider;
private ParquetFileReader(
@NotNull final URI parquetFileURI,
@NotNull final SeekableChannelsProvider provider) throws IOException {
this.channelsProvider = CachedChannelProvider.create(provider, 1 << 7);
if (!parquetFileURI.getRawPath().endsWith(".parquet") && FILE_URI_SCHEME.equals(parquetFileURI.getScheme())) {
// Construct a new file URI for the parent directory
rootURI = convertToURI(new File(parquetFileURI).getParentFile(), true);
Expand Down Expand Up @@ -270,7 +224,7 @@ private Set<String> calculateColumnsWithDictionaryUsedOnEveryDataPage() {

/**
* Create a {@link RowGroupReader} object for provided row group number
*
*
* @param version The "version" string from deephaven specific parquet metadata, or null if it's not present.
*/
public RowGroupReader getRowGroup(final int groupNumber, final String version) {
Expand Down Expand Up @@ -506,7 +460,7 @@ private static LogicalTypeAnnotation getLogicalTypeAnnotation(final ConvertedTyp

/**
* Helper method to determine if a logical type is adjusted to UTC.
*
*
* @param logicalType the logical type to check
* @return true if the logical type is a timestamp adjusted to UTC, false otherwise
*/
Expand Down
Loading

0 comments on commit 2dbbf32

Please sign in to comment.