Skip to content

Commit

Permalink
Round two, commit #1, apply the same to chunkinputstreamgenerator
Browse files Browse the repository at this point in the history
  • Loading branch information
niloc132 committed Jun 26, 2024
1 parent db7a35d commit f81ad20
Show file tree
Hide file tree
Showing 8 changed files with 229 additions and 185 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.deephaven.engine.rowset.impl.ExternalizableRowSetUtils;
import io.deephaven.engine.table.impl.util.BarrageMessage;
import io.deephaven.extensions.barrage.chunk.ChunkInputStreamGenerator;
import io.deephaven.extensions.barrage.chunk.DefaultChunkInputStreamGeneratorFactory;
import io.deephaven.extensions.barrage.chunk.SingleElementListHeaderInputStreamGenerator;
import io.deephaven.extensions.barrage.util.BarrageProtoUtil.ExposedByteArrayOutputStream;
import io.deephaven.extensions.barrage.util.BarrageUtil;
Expand Down Expand Up @@ -124,7 +125,8 @@ public static class ModColumnGenerator implements SafeCloseable {

ModColumnGenerator(final BarrageMessage.ModColumnData col) throws IOException {
rowsModified = new RowSetGenerator(col.rowsModified);
data = new ChunkListInputStreamGenerator(col.type, col.componentType, col.data, col.chunkType);
data = new ChunkListInputStreamGenerator(col.type, col.componentType, col.data, col.chunkType,
DefaultChunkInputStreamGeneratorFactory.INSTANCE);
}

@Override
Expand Down Expand Up @@ -174,7 +176,7 @@ public BarrageStreamGeneratorImpl(final BarrageMessage message,
for (int i = 0; i < message.addColumnData.length; ++i) {
BarrageMessage.AddColumnData columnData = message.addColumnData[i];
addColumnData[i] = new ChunkListInputStreamGenerator(columnData.type, columnData.componentType,
columnData.data, columnData.chunkType);
columnData.data, columnData.chunkType, DefaultChunkInputStreamGeneratorFactory.INSTANCE);
}

modColumnData = new ModColumnGenerator[message.modColumnData.length];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.deephaven.chunk.attributes.Values;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.extensions.barrage.chunk.ChunkInputStreamGenerator;
import io.deephaven.extensions.barrage.chunk.ChunkInputStreamGeneratorFactory;
import io.deephaven.extensions.barrage.util.StreamReaderOptions;
import io.deephaven.util.SafeCloseable;

Expand All @@ -20,20 +21,18 @@ public class ChunkListInputStreamGenerator implements SafeCloseable {
private final ChunkInputStreamGenerator emptyGenerator;

public ChunkListInputStreamGenerator(Class<?> type, Class<?> componentType, List<Chunk<Values>> data,
ChunkType chunkType) {
ChunkType chunkType, ChunkInputStreamGeneratorFactory factory) {
// create an input stream generator for each chunk
ChunkInputStreamGenerator[] generators = new ChunkInputStreamGenerator[data.size()];

long rowOffset = 0;
for (int i = 0; i < data.size(); ++i) {
final Chunk<Values> valuesChunk = data.get(i);
generators[i] = ChunkInputStreamGenerator.makeInputStreamGenerator(chunkType, type, componentType,
valuesChunk, rowOffset);
generators[i] = factory.makeInputStreamGenerator(chunkType, type, componentType, valuesChunk, rowOffset);
rowOffset += valuesChunk.size();
}
this.generators = Arrays.asList(generators);
emptyGenerator = ChunkInputStreamGenerator.makeInputStreamGenerator(
chunkType, type, componentType, chunkType.getEmptyChunk(), 0);
emptyGenerator = factory.makeInputStreamGenerator(chunkType, type, componentType, chunkType.getEmptyChunk(), 0);
}

public List<ChunkInputStreamGenerator> generators() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,19 @@
//
package io.deephaven.extensions.barrage.chunk;

import com.google.common.base.Charsets;
import io.deephaven.chunk.ObjectChunk;
import io.deephaven.chunk.WritableChunk;
import io.deephaven.chunk.WritableLongChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.chunk.util.pools.PoolableChunk;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.extensions.barrage.util.DefensiveDrainable;
import io.deephaven.extensions.barrage.util.StreamReaderOptions;
import io.deephaven.time.DateTimeUtils;
import io.deephaven.util.QueryConstants;
import io.deephaven.util.datastructures.LongSizedDataStructure;
import io.deephaven.chunk.Chunk;
import io.deephaven.chunk.ChunkType;
import io.deephaven.util.SafeCloseable;
import io.deephaven.vector.Vector;
import org.jetbrains.annotations.Nullable;

import java.io.DataInput;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.ZonedDateTime;
import java.util.Iterator;
import java.util.PrimitiveIterator;

Expand All @@ -37,153 +24,6 @@ public interface ChunkInputStreamGenerator extends SafeCloseable {
long MIN_LOCAL_DATE_VALUE = QueryConstants.MIN_LONG / MS_PER_DAY;
long MAX_LOCAL_DATE_VALUE = QueryConstants.MAX_LONG / MS_PER_DAY;

static <T> ChunkInputStreamGenerator makeInputStreamGenerator(
final ChunkType chunkType,
final Class<T> type,
final Class<?> componentType,
final Chunk<Values> chunk,
final long rowOffset) {
// TODO (deephaven-core#5453): pass in ArrowType to enable ser/deser of single java class in multiple formats
switch (chunkType) {
case Boolean:
throw new UnsupportedOperationException("Booleans are reinterpreted as bytes");
case Char:
return new CharChunkInputStreamGenerator(chunk.asCharChunk(), Character.BYTES, rowOffset);
case Byte:
if (type == Boolean.class || type == boolean.class) {
// internally we represent booleans as bytes, but the wire format respects arrow's specification
return new BooleanChunkInputStreamGenerator(chunk.asByteChunk(), rowOffset);
}
return new ByteChunkInputStreamGenerator(chunk.asByteChunk(), Byte.BYTES, rowOffset);
case Short:
return new ShortChunkInputStreamGenerator(chunk.asShortChunk(), Short.BYTES, rowOffset);
case Int:
return new IntChunkInputStreamGenerator(chunk.asIntChunk(), Integer.BYTES, rowOffset);
case Long:
return new LongChunkInputStreamGenerator(chunk.asLongChunk(), Long.BYTES, rowOffset);
case Float:
return new FloatChunkInputStreamGenerator(chunk.asFloatChunk(), Float.BYTES, rowOffset);
case Double:
return new DoubleChunkInputStreamGenerator(chunk.asDoubleChunk(), Double.BYTES, rowOffset);
case Object:
if (type.isArray()) {
if (componentType == byte.class) {
return new VarBinaryChunkInputStreamGenerator<>(chunk.asObjectChunk(), rowOffset,
(out, item) -> out.write((byte[]) item));
} else {
return new VarListChunkInputStreamGenerator<>(type, chunk.asObjectChunk(), rowOffset);
}
}
if (Vector.class.isAssignableFrom(type)) {
// noinspection unchecked
return new VectorChunkInputStreamGenerator(
(Class<Vector<?>>) type, componentType, chunk.asObjectChunk(), rowOffset);
}
if (type == String.class) {
return new VarBinaryChunkInputStreamGenerator<String>(chunk.asObjectChunk(), rowOffset,
(out, str) -> out.write(str.getBytes(Charsets.UTF_8)));
}
if (type == BigInteger.class) {
return new VarBinaryChunkInputStreamGenerator<BigInteger>(chunk.asObjectChunk(), rowOffset,
(out, item) -> out.write(item.toByteArray()));
}
if (type == BigDecimal.class) {
return new VarBinaryChunkInputStreamGenerator<BigDecimal>(chunk.asObjectChunk(), rowOffset,
(out, item) -> {
final BigDecimal normal = item.stripTrailingZeros();
final int v = normal.scale();
// Write as little endian, arrow endianness.
out.write(0xFF & v);
out.write(0xFF & (v >> 8));
out.write(0xFF & (v >> 16));
out.write(0xFF & (v >> 24));
out.write(normal.unscaledValue().toByteArray());
});
}
if (type == Instant.class) {
// This code path is utilized for arrays and vectors of Instant, which cannot be reinterpreted.
ObjectChunk<Instant, Values> objChunk = chunk.asObjectChunk();
WritableLongChunk<Values> outChunk = WritableLongChunk.makeWritableChunk(objChunk.size());
for (int i = 0; i < objChunk.size(); ++i) {
outChunk.set(i, DateTimeUtils.epochNanos(objChunk.get(i)));
}
if (chunk instanceof PoolableChunk) {
((PoolableChunk) chunk).close();
}
return new LongChunkInputStreamGenerator(outChunk, Long.BYTES, rowOffset);
}
if (type == ZonedDateTime.class) {
// This code path is utilized for arrays and vectors of Instant, which cannot be reinterpreted.
ObjectChunk<ZonedDateTime, Values> objChunk = chunk.asObjectChunk();
WritableLongChunk<Values> outChunk = WritableLongChunk.makeWritableChunk(objChunk.size());
for (int i = 0; i < objChunk.size(); ++i) {
outChunk.set(i, DateTimeUtils.epochNanos(objChunk.get(i)));
}
if (chunk instanceof PoolableChunk) {
((PoolableChunk) chunk).close();
}
return new LongChunkInputStreamGenerator(outChunk, Long.BYTES, rowOffset);
}
if (type == Boolean.class) {
return BooleanChunkInputStreamGenerator.convertBoxed(chunk.asObjectChunk(), rowOffset);
}
if (type == Byte.class) {
return ByteChunkInputStreamGenerator.convertBoxed(chunk.asObjectChunk(), rowOffset);
}
if (type == Character.class) {
return CharChunkInputStreamGenerator.convertBoxed(chunk.asObjectChunk(), rowOffset);
}
if (type == Double.class) {
return DoubleChunkInputStreamGenerator.convertBoxed(chunk.asObjectChunk(), rowOffset);
}
if (type == Float.class) {
return FloatChunkInputStreamGenerator.convertBoxed(chunk.asObjectChunk(), rowOffset);
}
if (type == Integer.class) {
return IntChunkInputStreamGenerator.convertBoxed(chunk.asObjectChunk(), rowOffset);
}
if (type == Long.class) {
return LongChunkInputStreamGenerator.convertBoxed(chunk.asObjectChunk(), rowOffset);
}
if (type == Short.class) {
return ShortChunkInputStreamGenerator.convertBoxed(chunk.asObjectChunk(), rowOffset);
}
if (type == LocalDate.class) {
return LongChunkInputStreamGenerator.<LocalDate>convertWithTransform(chunk.asObjectChunk(),
rowOffset, date -> {
if (date == null) {
return QueryConstants.NULL_LONG;
}
final long epochDay = date.toEpochDay();
if (epochDay < MIN_LOCAL_DATE_VALUE || epochDay > MAX_LOCAL_DATE_VALUE) {
throw new IllegalArgumentException("Date out of range: " + date + " (" + epochDay
+ " not in [" + MIN_LOCAL_DATE_VALUE + ", " + MAX_LOCAL_DATE_VALUE + "])");
}
return epochDay * MS_PER_DAY;
});
}
if (type == LocalTime.class) {
return LongChunkInputStreamGenerator.<LocalTime>convertWithTransform(chunk.asObjectChunk(),
rowOffset, time -> {
if (time == null) {
return QueryConstants.NULL_LONG;
}
final long nanoOfDay = time.toNanoOfDay();
if (nanoOfDay < 0) {
throw new IllegalArgumentException("Time out of range: " + time);
}
return nanoOfDay;
});
}
// TODO (core#936): support column conversion modes

return new VarBinaryChunkInputStreamGenerator<>(chunk.asObjectChunk(), rowOffset,
(out, item) -> out.write(item.toString().getBytes(Charsets.UTF_8)));
default:
throw new UnsupportedOperationException();
}
}

@Deprecated
static WritableChunk<Values> extractChunkFromInputStream(
final StreamReaderOptions options,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.extensions.barrage.chunk;

import io.deephaven.chunk.Chunk;
import io.deephaven.chunk.ChunkType;
import io.deephaven.chunk.attributes.Values;

/**
*
*/
public interface ChunkInputStreamGeneratorFactory {
/**
*
* @param chunkType
* @param type
* @param componentType
* @param chunk
* @param rowOffset
* @return
* @param <T>
*/
<T> ChunkInputStreamGenerator makeInputStreamGenerator(
final ChunkType chunkType,
final Class<T> type,
final Class<?> componentType,
final Chunk<Values> chunk,
final long rowOffset);
}
Loading

0 comments on commit f81ad20

Please sign in to comment.