diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ChunkInputStreamGenerator.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ChunkInputStreamGenerator.java index 92c62dd1d00..98cf03908e8 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ChunkInputStreamGenerator.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ChunkInputStreamGenerator.java @@ -10,7 +10,6 @@ import io.deephaven.chunk.attributes.Values; import io.deephaven.chunk.util.pools.PoolableChunk; import io.deephaven.engine.rowset.RowSet; -import io.deephaven.extensions.barrage.ColumnConversionMode; import io.deephaven.extensions.barrage.util.DefensiveDrainable; import io.deephaven.extensions.barrage.util.StreamReaderOptions; import io.deephaven.time.DateTimeUtils; @@ -19,7 +18,6 @@ import io.deephaven.chunk.Chunk; import io.deephaven.chunk.ChunkType; import io.deephaven.util.SafeCloseable; -import io.deephaven.util.type.TypeUtils; import io.deephaven.vector.Vector; import org.jetbrains.annotations.Nullable; @@ -31,7 +29,6 @@ import java.time.LocalDate; import java.time.LocalTime; import java.time.ZonedDateTime; -import java.util.Arrays; import java.util.Iterator; import java.util.PrimitiveIterator; @@ -187,6 +184,7 @@ static ChunkInputStreamGenerator makeInputStreamGenerator( } } + @Deprecated static WritableChunk extractChunkFromInputStream( final StreamReaderOptions options, final ChunkType chunkType, final Class type, final Class componentType, @@ -195,10 +193,10 @@ static WritableChunk extractChunkFromInputStream( final DataInput is, final WritableChunk outChunk, final int offset, final int totalRows) throws IOException { return extractChunkFromInputStream(options, 1, chunkType, type, componentType, fieldNodeIter, bufferInfoIter, - is, - outChunk, offset, totalRows); + is, outChunk, offset, totalRows); } + @Deprecated static WritableChunk extractChunkFromInputStream( final StreamReaderOptions options, final int factor, @@ -207,170 +205,9 @@ static WritableChunk extractChunkFromInputStream( final PrimitiveIterator.OfLong bufferInfoIter, final DataInput is, final WritableChunk outChunk, final int outOffset, final int totalRows) throws IOException { - // TODO (deephaven-core#5453): pass in ArrowType to enable ser/deser of single java class in multiple formats - switch (chunkType) { - case Boolean: - throw new UnsupportedOperationException("Booleans are reinterpreted as bytes"); - case Char: - return CharChunkInputStreamGenerator.extractChunkFromInputStream( - Character.BYTES, options, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); - case Byte: - if (type == Boolean.class || type == boolean.class) { - return BooleanChunkInputStreamGenerator.extractChunkFromInputStream( - options, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); - } - return ByteChunkInputStreamGenerator.extractChunkFromInputStream( - Byte.BYTES, options, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); - case Short: - return ShortChunkInputStreamGenerator.extractChunkFromInputStream( - Short.BYTES, options, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); - case Int: - return IntChunkInputStreamGenerator.extractChunkFromInputStream( - Integer.BYTES, options, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); - case Long: - if (factor == 1) { - return LongChunkInputStreamGenerator.extractChunkFromInputStream( - Long.BYTES, options, - fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); - } - return LongChunkInputStreamGenerator.extractChunkFromInputStreamWithConversion( - Long.BYTES, options, - (long v) -> v == QueryConstants.NULL_LONG ? QueryConstants.NULL_LONG : (v * factor), - fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); - case Float: - return FloatChunkInputStreamGenerator.extractChunkFromInputStream( - Float.BYTES, options, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); - case Double: - return DoubleChunkInputStreamGenerator.extractChunkFromInputStream( - Double.BYTES, options, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); - case Object: - if (type.isArray()) { - if (componentType == byte.class) { - return VarBinaryChunkInputStreamGenerator.extractChunkFromInputStream( - is, - fieldNodeIter, - bufferInfoIter, - (buf, off, len) -> Arrays.copyOfRange(buf, off, off + len), - outChunk, outOffset, totalRows); - } else { - return VarListChunkInputStreamGenerator.extractChunkFromInputStream( - options, type, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); - } - } - if (Vector.class.isAssignableFrom(type)) { - // noinspection unchecked - return VectorChunkInputStreamGenerator.extractChunkFromInputStream( - options, (Class>) type, componentType, fieldNodeIter, bufferInfoIter, is, - outChunk, outOffset, totalRows); - } - if (type == BigInteger.class) { - return VarBinaryChunkInputStreamGenerator.extractChunkFromInputStream( - is, - fieldNodeIter, - bufferInfoIter, - BigInteger::new, - outChunk, outOffset, totalRows); - } - if (type == BigDecimal.class) { - return VarBinaryChunkInputStreamGenerator.extractChunkFromInputStream( - is, - fieldNodeIter, - bufferInfoIter, - (final byte[] buf, final int offset, final int length) -> { - // read the int scale value as little endian, arrow's endianness. - final byte b1 = buf[offset]; - final byte b2 = buf[offset + 1]; - final byte b3 = buf[offset + 2]; - final byte b4 = buf[offset + 3]; - final int scale = b4 << 24 | (b3 & 0xFF) << 16 | (b2 & 0xFF) << 8 | (b1 & 0xFF); - return new BigDecimal(new BigInteger(buf, offset + 4, length - 4), scale); - }, - outChunk, outOffset, totalRows); - } - if (type == Instant.class) { - return FixedWidthChunkInputStreamGenerator.extractChunkFromInputStreamWithTypeConversion( - Long.BYTES, options, io -> { - final long value = io.readLong(); - if (value == QueryConstants.NULL_LONG) { - return null; - } - return DateTimeUtils.epochNanosToInstant(value * factor); - }, - fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); - } - if (type == ZonedDateTime.class) { - return FixedWidthChunkInputStreamGenerator.extractChunkFromInputStreamWithTypeConversion( - Long.BYTES, options, io -> { - final long value = io.readLong(); - if (value == QueryConstants.NULL_LONG) { - return null; - } - return DateTimeUtils.epochNanosToZonedDateTime( - value * factor, DateTimeUtils.timeZone()); - }, - fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); - } - if (type == Byte.class) { - return FixedWidthChunkInputStreamGenerator.extractChunkFromInputStreamWithTypeConversion( - Byte.BYTES, options, io -> TypeUtils.box(io.readByte()), - fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); - } - if (type == Character.class) { - return FixedWidthChunkInputStreamGenerator.extractChunkFromInputStreamWithTypeConversion( - Character.BYTES, options, io -> TypeUtils.box(io.readChar()), - fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); - } - if (type == Double.class) { - return FixedWidthChunkInputStreamGenerator.extractChunkFromInputStreamWithTypeConversion( - Double.BYTES, options, io -> TypeUtils.box(io.readDouble()), - fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); - } - if (type == Float.class) { - return FixedWidthChunkInputStreamGenerator.extractChunkFromInputStreamWithTypeConversion( - Float.BYTES, options, io -> TypeUtils.box(io.readFloat()), - fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); - } - if (type == Integer.class) { - return FixedWidthChunkInputStreamGenerator.extractChunkFromInputStreamWithTypeConversion( - Integer.BYTES, options, io -> TypeUtils.box(io.readInt()), - fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); - } - if (type == Long.class) { - return FixedWidthChunkInputStreamGenerator.extractChunkFromInputStreamWithTypeConversion( - Long.BYTES, options, io -> TypeUtils.box(io.readLong()), - fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); - } - if (type == Short.class) { - return FixedWidthChunkInputStreamGenerator.extractChunkFromInputStreamWithTypeConversion( - Short.BYTES, options, io -> TypeUtils.box(io.readShort()), - fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); - } - if (type == LocalDate.class) { - return LongChunkInputStreamGenerator.extractChunkFromInputStreamWithTransform( - Long.BYTES, options, - value -> value == QueryConstants.NULL_LONG - ? null - : LocalDate.ofEpochDay(value / MS_PER_DAY), - fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); - } - if (type == LocalTime.class) { - return LongChunkInputStreamGenerator.extractChunkFromInputStreamWithTransform( - Long.BYTES, options, - value -> value == QueryConstants.NULL_LONG ? null : LocalTime.ofNanoOfDay(value), - fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); - } - if (type == String.class || - options.columnConversionMode().equals(ColumnConversionMode.Stringify)) { - return VarBinaryChunkInputStreamGenerator.extractChunkFromInputStream(is, fieldNodeIter, - bufferInfoIter, - (buf, off, len) -> new String(buf, off, len, Charsets.UTF_8), outChunk, outOffset, - totalRows); - } - throw new UnsupportedOperationException( - "Do not yet support column conversion mode: " + options.columnConversionMode()); - default: - throw new UnsupportedOperationException(); - } + return DefaultChunkReadingFactory.INSTANCE.extractChunkFromInputStream(options, factor, + new ChunkReadingFactory.ChunkTypeInfo(chunkType, type, componentType, null), fieldNodeIter, + bufferInfoIter, is, outChunk, outOffset, totalRows); } /** diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ChunkReadingFactory.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ChunkReadingFactory.java new file mode 100644 index 00000000000..d7cfb18db00 --- /dev/null +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ChunkReadingFactory.java @@ -0,0 +1,116 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.extensions.barrage.chunk; + +import io.deephaven.chunk.ChunkType; +import io.deephaven.chunk.WritableChunk; +import io.deephaven.chunk.attributes.Values; +import io.deephaven.extensions.barrage.util.StreamReaderOptions; +import org.apache.arrow.flatbuf.Field; +import org.apache.arrow.flatbuf.Type; + +import java.io.DataInput; +import java.io.IOException; +import java.util.Iterator; +import java.util.PrimitiveIterator; + +/** + * + */ +public interface ChunkReadingFactory { + /** + * + */ + class ChunkTypeInfo { + private final ChunkType chunkType; + private final Class type; + private final Class componentType; + private final Field arrowField; + + public ChunkTypeInfo(ChunkType chunkType, Class type, Class componentType, Field arrowField) { + this.chunkType = chunkType; + this.type = type; + this.componentType = componentType; + this.arrowField = arrowField; + } + + public ChunkType chunkType() { + return chunkType; + } + + public Class type() { + return type; + } + + public Class componentType() { + return componentType; + } + + public Field arrowField() { + return arrowField; + } + + public Field componentArrowField() { + if (arrowField.typeType() != Type.List) { + throw new IllegalStateException("Not a flight List"); + } + if (arrowField.childrenLength() != 1) { + throw new IllegalStateException("Incorrect number of child Fields"); + } + return arrowField.children(0); + } + } + + /** + * + * @param options + * @param factor + * @param typeInfo + * @param fieldNodeIter + * @param bufferInfoIter + * @param is + * @param outChunk + * @param outOffset + * @param totalRows + * @return + * @throws IOException + */ + WritableChunk extractChunkFromInputStream( + final StreamReaderOptions options, + final int factor, + final ChunkTypeInfo typeInfo, + final Iterator fieldNodeIter, + final PrimitiveIterator.OfLong bufferInfoIter, + final DataInput is, + final WritableChunk outChunk, + final int outOffset, + final int totalRows) throws IOException; + + /** + * + * @param options + * @param typeInfo + * @param fieldNodeIter + * @param bufferInfoIter + * @param is + * @param outChunk + * @param offset + * @param totalRows + * @return + * @throws IOException + */ + default WritableChunk extractChunkFromInputStream( + final StreamReaderOptions options, + final ChunkTypeInfo typeInfo, + final Iterator fieldNodeIter, + final PrimitiveIterator.OfLong bufferInfoIter, + final DataInput is, + final WritableChunk outChunk, + final int offset, + final int totalRows) throws IOException { + return extractChunkFromInputStream(options, 1, typeInfo, fieldNodeIter, bufferInfoIter, is, outChunk, offset, + totalRows); + } + +} diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkReadingFactory.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkReadingFactory.java new file mode 100644 index 00000000000..b8146dfda9c --- /dev/null +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkReadingFactory.java @@ -0,0 +1,207 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.extensions.barrage.chunk; + +import com.google.common.base.Charsets; +import io.deephaven.chunk.WritableChunk; +import io.deephaven.chunk.attributes.Values; +import io.deephaven.extensions.barrage.ColumnConversionMode; +import io.deephaven.extensions.barrage.util.StreamReaderOptions; +import io.deephaven.time.DateTimeUtils; +import io.deephaven.util.QueryConstants; +import io.deephaven.util.type.TypeUtils; +import io.deephaven.vector.Vector; + +import java.io.DataInput; +import java.io.IOException; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalTime; +import java.time.ZonedDateTime; +import java.util.Arrays; +import java.util.Iterator; +import java.util.PrimitiveIterator; + +import static io.deephaven.extensions.barrage.chunk.ChunkInputStreamGenerator.MS_PER_DAY; + +/** + * JVM implementation of ChunkReadingFactory, suitable for use in Java clients and servers. This default implementations + * may not round trip flight types correctly, but will round trip Deephaven table definitions and table data. Neither of + * these is a required/expected property of being a Flight/Barrage/Deephaven client. + */ +public final class DefaultChunkReadingFactory implements ChunkReadingFactory { + public static final ChunkReadingFactory INSTANCE = new DefaultChunkReadingFactory(); + + @Override + public WritableChunk extractChunkFromInputStream(StreamReaderOptions options, int factor, + ChunkTypeInfo typeInfo, Iterator fieldNodeIter, + PrimitiveIterator.OfLong bufferInfoIter, DataInput is, WritableChunk outChunk, int outOffset, + int totalRows) throws IOException { + // TODO (deephaven-core#5453): pass in ArrowType to enable ser/deser of single java class in multiple formats + switch (typeInfo.chunkType()) { + case Boolean: + throw new UnsupportedOperationException("Booleans are reinterpreted as bytes"); + case Char: + return CharChunkInputStreamGenerator.extractChunkFromInputStream( + Character.BYTES, options, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); + case Byte: + if (typeInfo.type() == Boolean.class || typeInfo.type() == boolean.class) { + return BooleanChunkInputStreamGenerator.extractChunkFromInputStream( + options, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); + } + return ByteChunkInputStreamGenerator.extractChunkFromInputStream( + Byte.BYTES, options, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); + case Short: + return ShortChunkInputStreamGenerator.extractChunkFromInputStream( + Short.BYTES, options, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); + case Int: + return IntChunkInputStreamGenerator.extractChunkFromInputStream( + Integer.BYTES, options, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); + case Long: + if (factor == 1) { + return LongChunkInputStreamGenerator.extractChunkFromInputStream( + Long.BYTES, options, + fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); + } + return LongChunkInputStreamGenerator.extractChunkFromInputStreamWithConversion( + Long.BYTES, options, + (long v) -> v == QueryConstants.NULL_LONG ? QueryConstants.NULL_LONG : (v * factor), + fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); + case Float: + return FloatChunkInputStreamGenerator.extractChunkFromInputStream( + Float.BYTES, options, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); + case Double: + return DoubleChunkInputStreamGenerator.extractChunkFromInputStream( + Double.BYTES, options, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); + case Object: + if (typeInfo.type().isArray()) { + if (typeInfo.componentType() == byte.class) { + return VarBinaryChunkInputStreamGenerator.extractChunkFromInputStream( + is, + fieldNodeIter, + bufferInfoIter, + (buf, off, len) -> Arrays.copyOfRange(buf, off, off + len), + outChunk, outOffset, totalRows); + } else { + return VarListChunkInputStreamGenerator.extractChunkFromInputStream(options, typeInfo, + fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows, this); + } + } + if (Vector.class.isAssignableFrom(typeInfo.type())) { + return VectorChunkInputStreamGenerator.extractChunkFromInputStream(options, + typeInfo, fieldNodeIter, bufferInfoIter, + is, outChunk, outOffset, totalRows, this); + } + if (typeInfo.type() == BigInteger.class) { + return VarBinaryChunkInputStreamGenerator.extractChunkFromInputStream( + is, + fieldNodeIter, + bufferInfoIter, + BigInteger::new, + outChunk, outOffset, totalRows); + } + if (typeInfo.type() == BigDecimal.class) { + return VarBinaryChunkInputStreamGenerator.extractChunkFromInputStream( + is, + fieldNodeIter, + bufferInfoIter, + (final byte[] buf, final int offset, final int length) -> { + // read the int scale value as little endian, arrow's endianness. + final byte b1 = buf[offset]; + final byte b2 = buf[offset + 1]; + final byte b3 = buf[offset + 2]; + final byte b4 = buf[offset + 3]; + final int scale = b4 << 24 | (b3 & 0xFF) << 16 | (b2 & 0xFF) << 8 | (b1 & 0xFF); + return new BigDecimal(new BigInteger(buf, offset + 4, length - 4), scale); + }, + outChunk, outOffset, totalRows); + } + if (typeInfo.type() == Instant.class) { + return FixedWidthChunkInputStreamGenerator.extractChunkFromInputStreamWithTypeConversion( + Long.BYTES, options, io -> { + final long value = io.readLong(); + if (value == QueryConstants.NULL_LONG) { + return null; + } + return DateTimeUtils.epochNanosToInstant(value * factor); + }, + fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); + } + if (typeInfo.type() == ZonedDateTime.class) { + return FixedWidthChunkInputStreamGenerator.extractChunkFromInputStreamWithTypeConversion( + Long.BYTES, options, io -> { + final long value = io.readLong(); + if (value == QueryConstants.NULL_LONG) { + return null; + } + return DateTimeUtils.epochNanosToZonedDateTime( + value * factor, DateTimeUtils.timeZone()); + }, + fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); + } + if (typeInfo.type() == Byte.class) { + return FixedWidthChunkInputStreamGenerator.extractChunkFromInputStreamWithTypeConversion( + Byte.BYTES, options, io -> TypeUtils.box(io.readByte()), + fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); + } + if (typeInfo.type() == Character.class) { + return FixedWidthChunkInputStreamGenerator.extractChunkFromInputStreamWithTypeConversion( + Character.BYTES, options, io -> TypeUtils.box(io.readChar()), + fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); + } + if (typeInfo.type() == Double.class) { + return FixedWidthChunkInputStreamGenerator.extractChunkFromInputStreamWithTypeConversion( + Double.BYTES, options, io -> TypeUtils.box(io.readDouble()), + fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); + } + if (typeInfo.type() == Float.class) { + return FixedWidthChunkInputStreamGenerator.extractChunkFromInputStreamWithTypeConversion( + Float.BYTES, options, io -> TypeUtils.box(io.readFloat()), + fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); + } + if (typeInfo.type() == Integer.class) { + return FixedWidthChunkInputStreamGenerator.extractChunkFromInputStreamWithTypeConversion( + Integer.BYTES, options, io -> TypeUtils.box(io.readInt()), + fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); + } + if (typeInfo.type() == Long.class) { + return FixedWidthChunkInputStreamGenerator.extractChunkFromInputStreamWithTypeConversion( + Long.BYTES, options, io -> TypeUtils.box(io.readLong()), + fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); + } + if (typeInfo.type() == Short.class) { + return FixedWidthChunkInputStreamGenerator.extractChunkFromInputStreamWithTypeConversion( + Short.BYTES, options, io -> TypeUtils.box(io.readShort()), + fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); + } + if (typeInfo.type() == LocalDate.class) { + return LongChunkInputStreamGenerator.extractChunkFromInputStreamWithTransform( + Long.BYTES, options, + value -> value == QueryConstants.NULL_LONG + ? null + : LocalDate.ofEpochDay(value / MS_PER_DAY), + fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); + } + if (typeInfo.type() == LocalTime.class) { + return LongChunkInputStreamGenerator.extractChunkFromInputStreamWithTransform( + Long.BYTES, options, + value -> value == QueryConstants.NULL_LONG ? null : LocalTime.ofNanoOfDay(value), + fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); + } + if (typeInfo.type() == String.class || + options.columnConversionMode().equals(ColumnConversionMode.Stringify)) { + return VarBinaryChunkInputStreamGenerator.extractChunkFromInputStream(is, fieldNodeIter, + bufferInfoIter, + (buf, off, len) -> new String(buf, off, len, Charsets.UTF_8), outChunk, outOffset, + totalRows); + } + throw new UnsupportedOperationException( + "Do not yet support column conversion mode: " + options.columnConversionMode()); + default: + throw new UnsupportedOperationException(); + } + } +} diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VarListChunkInputStreamGenerator.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VarListChunkInputStreamGenerator.java index 0a109230ca6..e1075e7dcf3 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VarListChunkInputStreamGenerator.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VarListChunkInputStreamGenerator.java @@ -235,19 +235,20 @@ public int drainTo(final OutputStream outputStream) throws IOException { static WritableObjectChunk extractChunkFromInputStream( final StreamReaderOptions options, - final Class type, + final ChunkReadingFactory.ChunkTypeInfo typeInfo, final Iterator fieldNodeIter, final PrimitiveIterator.OfLong bufferInfoIter, final DataInput is, final WritableChunk outChunk, final int outOffset, - final int totalRows) throws IOException { + final int totalRows, + ChunkReadingFactory chunkReadingFactory) throws IOException { final FieldNodeInfo nodeInfo = fieldNodeIter.next(); final long validityBuffer = bufferInfoIter.nextLong(); final long offsetsBuffer = bufferInfoIter.nextLong(); - final Class componentType = type.getComponentType(); + final Class componentType = typeInfo.type().getComponentType(); final Class innerComponentType = componentType != null ? componentType.getComponentType() : null; final ChunkType chunkType; @@ -259,8 +260,11 @@ static WritableObjectChunk extractChunkFromInputStream( } if (nodeInfo.numElements == 0) { - try (final WritableChunk ignored = ChunkInputStreamGenerator.extractChunkFromInputStream( - options, chunkType, componentType, innerComponentType, fieldNodeIter, + try (final WritableChunk ignored = chunkReadingFactory.extractChunkFromInputStream( + options, + new ChunkReadingFactory.ChunkTypeInfo(chunkType, componentType, innerComponentType, + typeInfo.componentArrowField()), + fieldNodeIter, bufferInfoIter, is, null, 0, 0)) { return WritableObjectChunk.makeWritableChunk(nodeInfo.numElements); } @@ -299,8 +303,10 @@ static WritableObjectChunk extractChunkFromInputStream( } final ArrayExpansionKernel kernel = ArrayExpansionKernel.makeExpansionKernel(chunkType, componentType); - try (final WritableChunk inner = ChunkInputStreamGenerator.extractChunkFromInputStream( - options, chunkType, componentType, innerComponentType, + try (final WritableChunk inner = chunkReadingFactory.extractChunkFromInputStream( + options, + new ChunkReadingFactory.ChunkTypeInfo(chunkType, componentType, innerComponentType, + typeInfo.componentArrowField()), fieldNodeIter, bufferInfoIter, is, null, 0, 0)) { chunk = kernel.contract(inner, offsets, outChunk, outOffset, totalRows); diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VectorChunkInputStreamGenerator.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VectorChunkInputStreamGenerator.java index 35245b11631..b7bb8cee6a4 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VectorChunkInputStreamGenerator.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VectorChunkInputStreamGenerator.java @@ -235,25 +235,29 @@ public int drainTo(final OutputStream outputStream) throws IOException { static WritableObjectChunk, Values> extractChunkFromInputStream( final StreamReaderOptions options, - final Class> type, - final Class inComponentType, + final ChunkReadingFactory.ChunkTypeInfo typeInfo, final Iterator fieldNodeIter, final PrimitiveIterator.OfLong bufferInfoIter, final DataInput is, final WritableChunk outChunk, final int outOffset, - final int totalRows) throws IOException { + final int totalRows, + ChunkReadingFactory chunkReadingFactory) throws IOException { final FieldNodeInfo nodeInfo = fieldNodeIter.next(); final long validityBuffer = bufferInfoIter.nextLong(); final long offsetsBuffer = bufferInfoIter.nextLong(); - final Class componentType = VectorExpansionKernel.getComponentType(type, inComponentType); + final Class componentType = + VectorExpansionKernel.getComponentType(typeInfo.type(), typeInfo.componentType()); final ChunkType chunkType = ChunkType.fromElementType(componentType); if (nodeInfo.numElements == 0) { - try (final WritableChunk ignored = ChunkInputStreamGenerator.extractChunkFromInputStream( - options, chunkType, componentType, componentType.getComponentType(), fieldNodeIter, bufferInfoIter, + try (final WritableChunk ignored = chunkReadingFactory.extractChunkFromInputStream( + options, + new ChunkReadingFactory.ChunkTypeInfo(chunkType, componentType, componentType.getComponentType(), + typeInfo.componentArrowField()), + fieldNodeIter, bufferInfoIter, is, null, 0, 0)) { if (outChunk != null) { @@ -296,8 +300,11 @@ static WritableObjectChunk, Values> extractChunkFromInputStream( } final VectorExpansionKernel kernel = VectorExpansionKernel.makeExpansionKernel(chunkType, componentType); - try (final WritableChunk inner = ChunkInputStreamGenerator.extractChunkFromInputStream( - options, chunkType, componentType, componentType.getComponentType(), fieldNodeIter, bufferInfoIter, + try (final WritableChunk inner = chunkReadingFactory.extractChunkFromInputStream( + options, + new ChunkReadingFactory.ChunkTypeInfo(chunkType, componentType, componentType.getComponentType(), + typeInfo.componentArrowField()), + fieldNodeIter, bufferInfoIter, is, null, 0, 0)) { chunk = kernel.contract(inner, offsets, outChunk, outOffset, totalRows); diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/ArrowToTableConverter.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/ArrowToTableConverter.java index 71f8a81b0fe..b0c62c652c1 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/ArrowToTableConverter.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/ArrowToTableConverter.java @@ -13,6 +13,8 @@ import io.deephaven.engine.table.impl.util.BarrageMessage; import io.deephaven.extensions.barrage.BarrageSubscriptionOptions; import io.deephaven.extensions.barrage.chunk.ChunkInputStreamGenerator; +import io.deephaven.extensions.barrage.chunk.ChunkReadingFactory; +import io.deephaven.extensions.barrage.chunk.DefaultChunkReadingFactory; import io.deephaven.extensions.barrage.table.BarrageTable; import io.deephaven.io.streams.ByteBufferInputStream; import io.deephaven.proto.util.Exceptions; @@ -45,6 +47,7 @@ public class ArrowToTableConverter { private Class[] columnTypes; private Class[] componentTypes; protected BarrageSubscriptionOptions options = DEFAULT_SER_OPTIONS; + private Schema schema; private volatile boolean completed = false; @@ -136,6 +139,7 @@ public synchronized void onCompleted() throws InterruptedException { } protected void parseSchema(final Schema header) { + this.schema = header; // The Schema instance (especially originated from Python) can't be assumed to be valid after the return // of this method. Until https://github.com/jpy-consortium/jpy/issues/126 is resolved, we need to make a copy of // the header to use after the return of this method. @@ -194,8 +198,10 @@ protected BarrageMessage createBarrageMessage(BarrageProtoUtil.MessageInfo mi, i msg.addColumnData[ci].data = new ArrayList<>(); final int factor = (columnConversionFactors == null) ? 1 : columnConversionFactors[ci]; try { - acd.data.add(ChunkInputStreamGenerator.extractChunkFromInputStream(options, factor, - columnChunkTypes[ci], columnTypes[ci], componentTypes[ci], fieldNodeIter, + acd.data.add(DefaultChunkReadingFactory.INSTANCE.extractChunkFromInputStream(options, factor, + new ChunkReadingFactory.ChunkTypeInfo(columnChunkTypes[ci], columnTypes[ci], componentTypes[ci], + schema.fields(ci)), + fieldNodeIter, bufferInfoIter, mi.inputStream, null, 0, 0)); } catch (final IOException unexpected) { throw new UncheckedDeephavenException(unexpected); diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageStreamReader.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageStreamReader.java index d535ffd0254..571082227db 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageStreamReader.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageStreamReader.java @@ -19,6 +19,8 @@ import io.deephaven.engine.rowset.RowSetShiftData; import io.deephaven.engine.table.impl.util.*; import io.deephaven.extensions.barrage.chunk.ChunkInputStreamGenerator; +import io.deephaven.extensions.barrage.chunk.ChunkReadingFactory; +import io.deephaven.extensions.barrage.chunk.DefaultChunkReadingFactory; import io.deephaven.util.datastructures.LongSizedDataStructure; import io.deephaven.chunk.ChunkType; import io.deephaven.internal.log.LoggerFactory; @@ -26,6 +28,7 @@ import org.apache.arrow.flatbuf.Message; import org.apache.arrow.flatbuf.MessageHeader; import org.apache.arrow.flatbuf.RecordBatch; +import org.apache.arrow.flatbuf.Schema; import java.io.IOException; import java.io.InputStream; @@ -51,8 +54,11 @@ public class BarrageStreamReader implements StreamReader { private long numModRowsRead = 0; private long numModRowsTotal = 0; + private Schema schema; private BarrageMessage msg = null; + private final ChunkReadingFactory chunkReadingFactory = DefaultChunkReadingFactory.INSTANCE; + public BarrageStreamReader(final LongConsumer deserializeTmConsumer) { this.deserializeTmConsumer = deserializeTmConsumer; } @@ -239,8 +245,10 @@ public BarrageMessage safelyParseFrom(final StreamReaderOptions options, // fill the chunk with data and assign back into the array acd.data.set(lastChunkIndex, - ChunkInputStreamGenerator.extractChunkFromInputStream(options, columnChunkTypes[ci], - columnTypes[ci], componentTypes[ci], fieldNodeIter, bufferInfoIter, ois, + chunkReadingFactory.extractChunkFromInputStream(options, + new ChunkReadingFactory.ChunkTypeInfo(columnChunkTypes[ci], + columnTypes[ci], componentTypes[ci], schema.fields(ci)), + fieldNodeIter, bufferInfoIter, ois, chunk, chunk.size(), (int) batch.length())); chunk.setSize(chunk.size() + (int) batch.length()); } @@ -270,8 +278,10 @@ public BarrageMessage safelyParseFrom(final StreamReaderOptions options, // fill the chunk with data and assign back into the array mcd.data.set(lastChunkIndex, - ChunkInputStreamGenerator.extractChunkFromInputStream(options, columnChunkTypes[ci], - columnTypes[ci], componentTypes[ci], fieldNodeIter, bufferInfoIter, ois, + chunkReadingFactory.extractChunkFromInputStream(options, + new ChunkReadingFactory.ChunkTypeInfo(columnChunkTypes[ci], + columnTypes[ci], componentTypes[ci], null), + fieldNodeIter, bufferInfoIter, ois, chunk, chunk.size(), numRowsToRead)); chunk.setSize(chunk.size() + numRowsToRead); } @@ -282,6 +292,7 @@ public BarrageMessage safelyParseFrom(final StreamReaderOptions options, if (header != null && header.headerType() == MessageHeader.Schema) { // there is no body and our clients do not want to see schema messages + this.schema = (Schema) header.header(new Schema()); return null; } diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageUtil.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageUtil.java index f8933e6aee0..ab412a1d830 100755 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageUtil.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageUtil.java @@ -510,6 +510,10 @@ public Class[] computeWireComponentTypes() { return tableDef.getColumnStream() .map(ColumnDefinition::getComponentType).toArray(Class[]::new); } + + // public Field[] fields() { + // return + // } } private static void setConversionFactor( diff --git a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSnapshotImpl.java b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSnapshotImpl.java index e0e0b1f7741..8ff73e27d93 100644 --- a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSnapshotImpl.java +++ b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSnapshotImpl.java @@ -322,7 +322,7 @@ public MethodDescriptor getClientDoExchangeDescripto .build(); } - private class BarrageDataMarshaller implements MethodDescriptor.Marshaller { + private static class BarrageDataMarshaller implements MethodDescriptor.Marshaller { private final BarrageSnapshotOptions options; private final ChunkType[] columnChunkTypes; private final Class[] columnTypes; @@ -366,7 +366,6 @@ public void onError(@NotNull final Throwable t) { } } - /** * The Completable Future is used to encapsulate the concept that the table is filled with requested data. */