Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve S3, parquet, and SeekableChannelsProvider #5137

Merged
merged 14 commits into from
Feb 14, 2024
3 changes: 3 additions & 0 deletions Util/channel/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ dependencies {
// Needed for SafeCloseable
implementation project(':Util')

// For CountingInputStream
Classpaths.inheritGuava(project)
rcaudy marked this conversation as resolved.
Show resolved Hide resolved

compileOnly depAnnotations

Classpaths.inheritJUnitPlatform(project)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.jetbrains.annotations.Nullable;

import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.channels.SeekableByteChannel;
Expand Down Expand Up @@ -67,6 +68,11 @@ public SeekableChannelContext makeContext() {
return wrappedProvider.makeContext();
}

@Override
public SeekableChannelContext makeSingleUseContext() {
return wrappedProvider.makeSingleUseContext();
}

@Override
public boolean isCompatibleWith(@NotNull final SeekableChannelContext channelContext) {
return wrappedProvider.isCompatibleWith(channelContext);
Expand All @@ -86,6 +92,11 @@ public SeekableByteChannel getReadChannel(@NotNull final SeekableChannelContext
return channel;
}

@Override
public InputStream getInputStream(SeekableByteChannel channel) throws IOException {
return wrappedProvider.getInputStream(channel);
}

@Override
public SeekableByteChannel getWriteChannel(@NotNull final Path path, final boolean append) throws IOException {
final String pathKey = path.toAbsolutePath().toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;

public class LocalFSChannelProvider implements SeekableChannelsProvider {
public class LocalFSChannelProvider extends SeekableChannelsProviderBase {

@Override
protected boolean readChannelIsBuffered() {
return false;
}

@Override
public SeekableChannelContext makeContext() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/**
* Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending
*/
package io.deephaven.util.channel;

import com.google.common.io.CountingInputStream;

import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.channels.SeekableByteChannel;
import java.util.Objects;

public final class PositionInputStream extends FilterInputStream {

/**
* Wraps a channel-backed input stream {@code in}, ensuring upon {@link #close()} that {@code channel's}
* {@link SeekableByteChannel#position()} has been advanced the exact amount of bytes that have been consumed from
* the <i>resulting</i> input stream. {@code in} is closed during {@link #close()}; as such, the caller must ensure
* that closing {@code in} does _not_ close {@code channel}. To remain valid, the caller must ensure that the
* resulting input stream isn't re-wrapped by any downstream code in a way that would adversely effect the position
* (such as wrapping the resulting input stream with buffering).
*
* @param channel the channel
* @param in the input stream based on the channel
* @return a positional input stream
* @throws IOException if an IO exception occurs
*/
public static InputStream of(SeekableByteChannel channel, InputStream in) throws IOException {
return new PositionInputStream(channel, in);
}

private final SeekableByteChannel ch;
private final long position;

private PositionInputStream(SeekableByteChannel ch, InputStream in) throws IOException {
super(new CountingInputStream(in));
this.ch = Objects.requireNonNull(ch);
this.position = ch.position();
}

@Override
public void close() throws IOException {
super.close();
ch.position(position + ((CountingInputStream) in).getCount());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/**
* Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending
*/
package io.deephaven.util.channel;

import io.deephaven.util.channel.SeekableChannelContext.Provider;

import java.util.Objects;

final class ProviderImpl implements Provider {
devinrsmith marked this conversation as resolved.
Show resolved Hide resolved
private final SeekableChannelContext context;

public ProviderImpl(SeekableChannelContext context) {
this.context = Objects.requireNonNull(context);
}

@Override
public SeekableChannelContext get() {
return context;
}

@Override
public void close() {
context.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/**
* Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending
*/
package io.deephaven.util.channel;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.util.Objects;

final class ReadableByteChannelNoClose implements ReadableByteChannel {

public static ReadableByteChannel of(ReadableByteChannel ch) {
if (ch instanceof ReadableByteChannelNoClose) {
return ch;
}
return new ReadableByteChannelNoClose(ch);
}

private final ReadableByteChannel ch;

private ReadableByteChannelNoClose(ReadableByteChannel ch) {
this.ch = Objects.requireNonNull(ch);
}

@Override
public int read(ByteBuffer dst) throws IOException {
return ch.read(dst);
}

@Override
public boolean isOpen() {
return ch.isOpen();
}

@Override
public void close() {
// skip
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,34 @@

import io.deephaven.util.SafeCloseable;

import java.io.Closeable;
import java.util.function.Supplier;

/**
* Context object for reading and writing to channels created by {@link SeekableChannelsProvider}.
*/
public interface SeekableChannelContext extends SafeCloseable {

SeekableChannelContext NULL = new SeekableChannelContext() {};
SeekableChannelContext NULL = SeekableChannelContextNull.NULL;

static Provider upgrade(SeekableChannelsProvider provider, SeekableChannelContext context) {
devinrsmith marked this conversation as resolved.
Show resolved Hide resolved
if (context != NULL) {
return () -> context;
}
return new ProviderImpl(provider.makeSingleUseContext());
}

/**
* Release any resources associated with this context. The context should not be used afterward.
*/
default void close() {}

interface Provider extends Closeable, Supplier<SeekableChannelContext> {
devinrsmith marked this conversation as resolved.
Show resolved Hide resolved

@Override
SeekableChannelContext get();

@Override
default void close() {}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
/**
* Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending
*/
package io.deephaven.util.channel;

enum SeekableChannelContextNull implements SeekableChannelContext {
NULL
devinrsmith marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.channels.SeekableByteChannel;
Expand Down Expand Up @@ -37,11 +38,34 @@ static URI convertToURI(final String source) {
return uri;
}

/**
* Wraps {@link SeekableChannelsProvider#getInputStream(SeekableByteChannel)} in a position-safe manner. To remain
devinrsmith marked this conversation as resolved.
Show resolved Hide resolved
* valid, the caller must ensure that the resulting input stream isn't re-wrapped by any downstream code in a way
* that would adversely effect the position (such as re-wrapping the resulting input stream with buffering).
*
* <p>
* Equivalent to {@code PositionInputStream.of(ch, provider.getInputStream(ch))}.
*
* @param provider the provider
* @param ch the seekable channel
* @return the position-safe input stream
* @throws IOException if an IO exception occurs
* @see PositionInputStream#of(SeekableByteChannel, InputStream)
*/
static InputStream positionInputStream(SeekableChannelsProvider provider, SeekableByteChannel ch)
throws IOException {
return PositionInputStream.of(ch, provider.getInputStream(ch));
}

/**
* Create a new {@link SeekableChannelContext} object for creating read channels via this provider.
*/
SeekableChannelContext makeContext();

default SeekableChannelContext makeSingleUseContext() {
devinrsmith marked this conversation as resolved.
Show resolved Hide resolved
return makeContext();
}

/**
* Check if the given context is compatible with this provider. Useful to test if we can use provided
* {@code context} object for creating channels with this provider.
Expand All @@ -56,6 +80,22 @@ default SeekableByteChannel getReadChannel(@NotNull SeekableChannelContext chann
SeekableByteChannel getReadChannel(@NotNull SeekableChannelContext channelContext, @NotNull URI uri)
throws IOException;

/**
* Creates an {@link InputStream} from the current position of {@code channel}; closing the resulting input stream
* does <i>not</i> close the {@code channel}. The {@link InputStream} will be buffered; either explicitly in the
* case where the implementation uses an unbuffered {@link #getReadChannel(SeekableChannelContext, URI)}, or
* implicitly when the implementation uses a buffered {@link #getReadChannel(SeekableChannelContext, URI)}.
* {@code channel} must have been created by {@code this} provider. The caller can't assume the position of
* {@code channel} after consuming the {@link InputStream}. For use-cases that require the channel's position to be
* incremented the exact amount the {@link InputStream} has been consumed, use
* {@link #positionInputStream(SeekableChannelsProvider, SeekableByteChannel)}.
*
* @param channel the channel
* @return the input stream
* @throws IOException if an IO exception occurs
*/
InputStream getInputStream(SeekableByteChannel channel) throws IOException;

default SeekableByteChannel getWriteChannel(@NotNull final String path, final boolean append) throws IOException {
return getWriteChannel(Paths.get(path), append);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/**
* Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending
*/
package io.deephaven.util.channel;

import java.io.BufferedInputStream;
import java.io.InputStream;
import java.nio.channels.Channels;
import java.nio.channels.SeekableByteChannel;

public abstract class SeekableChannelsProviderBase implements SeekableChannelsProvider {

protected abstract boolean readChannelIsBuffered();

@Override
public final InputStream getInputStream(SeekableByteChannel channel) {
devinrsmith marked this conversation as resolved.
Show resolved Hide resolved
final InputStream in = Channels.newInputStream(ReadableByteChannelNoClose.of(channel));
return readChannelIsBuffered() ? in : new BufferedInputStream(in, 8192);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -170,13 +170,18 @@ public void testReuse10() throws IOException {
}


private class TestChannelProvider implements SeekableChannelsProvider {
private class TestChannelProvider extends SeekableChannelsProviderBase {

AtomicInteger count = new AtomicInteger(0);

private final class TestChannelContext implements SeekableChannelContext {
}

@Override
protected boolean readChannelIsBuffered() {
return true; // TestMockChannel always returns 0, might as well be "buffered"
}

@Override
public SeekableChannelContext makeContext() {
return new TestChannelContext();
Expand Down
Loading
Loading