Skip to content

Commit

Permalink
feat: Add arrow Schema as supported type (#6285)
Browse files Browse the repository at this point in the history
This is in support of #6023, which needs a way to encode Schema as
VARBINARY. This also serves as the potential hook points needed to
implement something like #58.
  • Loading branch information
devinrsmith authored Oct 25, 2024
1 parent 744357d commit b823ccd
Show file tree
Hide file tree
Showing 5 changed files with 218 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
import io.deephaven.chunk.WritableLongChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.chunk.util.pools.PoolableChunk;
import io.deephaven.extensions.barrage.util.ArrowIpcUtil;
import io.deephaven.time.DateTimeUtils;
import io.deephaven.util.QueryConstants;
import io.deephaven.vector.Vector;
import org.apache.arrow.vector.types.pojo.Schema;

import java.math.BigDecimal;
import java.math.BigInteger;
Expand Down Expand Up @@ -167,8 +169,13 @@ public <T> ChunkInputStreamGenerator makeInputStreamGenerator(ChunkType chunkTyp
return nanoOfDay;
});
}
// TODO (core#58): add custom barrage serialization/deserialization support
// Migrate Schema to custom format when available.
if (type == Schema.class) {
return new VarBinaryChunkInputStreamGenerator<>(chunk.asObjectChunk(), rowOffset,
ArrowIpcUtil::serialize);
}
// TODO (core#936): support column conversion modes

return new VarBinaryChunkInputStreamGenerator<>(chunk.asObjectChunk(), rowOffset,
(out, item) -> out.write(item.toString().getBytes(Charsets.UTF_8)));
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,28 @@
package io.deephaven.extensions.barrage.chunk;

import com.google.common.base.Charsets;
import io.deephaven.chunk.WritableChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.extensions.barrage.ColumnConversionMode;
import io.deephaven.extensions.barrage.util.ArrowIpcUtil;
import io.deephaven.extensions.barrage.util.StreamReaderOptions;
import io.deephaven.time.DateTimeUtils;
import io.deephaven.util.QueryConstants;
import io.deephaven.util.type.TypeUtils;
import io.deephaven.vector.Vector;
import org.apache.arrow.vector.types.pojo.Schema;

import java.io.DataInput;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.ZonedDateTime;
import java.util.Arrays;
import java.util.Iterator;
import java.util.PrimitiveIterator;

import static io.deephaven.extensions.barrage.chunk.ChunkInputStreamGenerator.MS_PER_DAY;

Expand Down Expand Up @@ -60,13 +68,7 @@ public ChunkReader getReader(StreamReaderOptions options, int factor,
case Object:
if (typeInfo.type().isArray()) {
if (typeInfo.componentType() == byte.class) {
return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset,
totalRows) -> VarBinaryChunkInputStreamGenerator.extractChunkFromInputStream(
is,
fieldNodeIter,
bufferInfoIter,
(buf, off, len) -> Arrays.copyOfRange(buf, off, off + len),
outChunk, outOffset, totalRows);
return ByteArrayChunkReader.BYTEARRAY_READER;
} else {
return new VarListChunkReader<>(options, typeInfo, this);
}
Expand All @@ -75,30 +77,10 @@ public ChunkReader getReader(StreamReaderOptions options, int factor,
return new VectorChunkReader(options, typeInfo, this);
}
if (typeInfo.type() == BigInteger.class) {
return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset,
totalRows) -> VarBinaryChunkInputStreamGenerator.extractChunkFromInputStream(
is,
fieldNodeIter,
bufferInfoIter,
BigInteger::new,
outChunk, outOffset, totalRows);
return BigIntegerChunkReader.BIG_INTEGER_CHUNK_READER;
}
if (typeInfo.type() == BigDecimal.class) {
return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset,
totalRows) -> VarBinaryChunkInputStreamGenerator.extractChunkFromInputStream(
is,
fieldNodeIter,
bufferInfoIter,
(final byte[] buf, final int offset, final int length) -> {
// read the int scale value as little endian, arrow's endianness.
final byte b1 = buf[offset];
final byte b2 = buf[offset + 1];
final byte b3 = buf[offset + 2];
final byte b4 = buf[offset + 3];
final int scale = b4 << 24 | (b3 & 0xFF) << 16 | (b2 & 0xFF) << 8 | (b1 & 0xFF);
return new BigDecimal(new BigInteger(buf, offset + 4, length - 4), scale);
},
outChunk, outOffset, totalRows);
return BigDecimalChunkReader.BIG_DECIMAL_CHUNK_READER;
}
if (typeInfo.type() == Instant.class) {
return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset,
Expand Down Expand Up @@ -184,19 +166,131 @@ public ChunkReader getReader(StreamReaderOptions options, int factor,
return new LongChunkReader(options).transform(
value -> value == QueryConstants.NULL_LONG ? null : LocalTime.ofNanoOfDay(value));
}
if (typeInfo.type() == String.class ||
options.columnConversionMode().equals(ColumnConversionMode.Stringify)) {
return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset,
totalRows) -> VarBinaryChunkInputStreamGenerator.extractChunkFromInputStream(is,
fieldNodeIter,
bufferInfoIter,
(buf, off, len) -> new String(buf, off, len, Charsets.UTF_8), outChunk, outOffset,
totalRows);
if (typeInfo.type() == String.class) {
return StringChunkReader.STRING_CHUNK_READER;
}
// TODO (core#58): add custom barrage serialization/deserialization support
// // Migrate Schema to custom format when available.
if (typeInfo.type() == Schema.class) {
return SchemaChunkReader.SCHEMA_CHUNK_READER;
}
// Note: this Stringify check should come last
if (options.columnConversionMode().equals(ColumnConversionMode.Stringify)) {
return StringChunkReader.STRING_CHUNK_READER;
}
// TODO (core#936): support column conversion modes
throw new UnsupportedOperationException(
"Do not yet support column conversion mode: " + options.columnConversionMode());
default:
throw new UnsupportedOperationException();
}
}

private enum ByteArrayChunkReader implements ChunkReader {
BYTEARRAY_READER;

@Override
public WritableChunk<Values> readChunk(Iterator<ChunkInputStreamGenerator.FieldNodeInfo> fieldNodeIter,
PrimitiveIterator.OfLong bufferInfoIter, DataInput is, WritableChunk<Values> outChunk, int outOffset,
int totalRows) throws IOException {
return VarBinaryChunkInputStreamGenerator.extractChunkFromInputStream(
is,
fieldNodeIter,
bufferInfoIter,
ByteArrayChunkReader::readBytes,
outChunk,
outOffset,
totalRows);
}

private static byte[] readBytes(byte[] buf, int off, int len) {
return Arrays.copyOfRange(buf, off, off + len);
}
}

private enum BigIntegerChunkReader implements ChunkReader {
BIG_INTEGER_CHUNK_READER;

@Override
public WritableChunk<Values> readChunk(Iterator<ChunkInputStreamGenerator.FieldNodeInfo> fieldNodeIter,
PrimitiveIterator.OfLong bufferInfoIter, DataInput is, WritableChunk<Values> outChunk, int outOffset,
int totalRows) throws IOException {
return VarBinaryChunkInputStreamGenerator.extractChunkFromInputStream(
is,
fieldNodeIter,
bufferInfoIter,
BigInteger::new,
outChunk,
outOffset,
totalRows);
}
}

private enum BigDecimalChunkReader implements ChunkReader {
BIG_DECIMAL_CHUNK_READER;

@Override
public WritableChunk<Values> readChunk(Iterator<ChunkInputStreamGenerator.FieldNodeInfo> fieldNodeIter,
PrimitiveIterator.OfLong bufferInfoIter, DataInput is, WritableChunk<Values> outChunk, int outOffset,
int totalRows) throws IOException {
return VarBinaryChunkInputStreamGenerator.extractChunkFromInputStream(
is,
fieldNodeIter,
bufferInfoIter,
BigDecimalChunkReader::readBigDecimal,
outChunk,
outOffset,
totalRows);
}

private static BigDecimal readBigDecimal(byte[] buf, int offset, int length) {
// read the int scale value as little endian, arrow's endianness.
final byte b1 = buf[offset];
final byte b2 = buf[offset + 1];
final byte b3 = buf[offset + 2];
final byte b4 = buf[offset + 3];
final int scale = b4 << 24 | (b3 & 0xFF) << 16 | (b2 & 0xFF) << 8 | (b1 & 0xFF);
return new BigDecimal(new BigInteger(buf, offset + 4, length - 4), scale);
}
}

private enum StringChunkReader implements ChunkReader {
STRING_CHUNK_READER;

@Override
public WritableChunk<Values> readChunk(Iterator<ChunkInputStreamGenerator.FieldNodeInfo> fieldNodeIter,
PrimitiveIterator.OfLong bufferInfoIter, DataInput is, WritableChunk<Values> outChunk, int outOffset,
int totalRows) throws IOException {
return VarBinaryChunkInputStreamGenerator.extractChunkFromInputStream(
is,
fieldNodeIter,
bufferInfoIter,
StringChunkReader::readString,
outChunk,
outOffset,
totalRows);
}

private static String readString(byte[] buf, int off, int len) {
return new String(buf, off, len, Charsets.UTF_8);
}
}

private enum SchemaChunkReader implements ChunkReader {
SCHEMA_CHUNK_READER;

@Override
public WritableChunk<Values> readChunk(Iterator<ChunkInputStreamGenerator.FieldNodeInfo> fieldNodeIter,
PrimitiveIterator.OfLong bufferInfoIter, DataInput is, WritableChunk<Values> outChunk, int outOffset,
int totalRows) throws IOException {
return VarBinaryChunkInputStreamGenerator.extractChunkFromInputStream(
is,
fieldNodeIter,
bufferInfoIter,
ArrowIpcUtil::deserialize,
outChunk,
outOffset,
totalRows);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.extensions.barrage.util;

import org.apache.arrow.vector.ipc.ReadChannel;
import org.apache.arrow.vector.ipc.WriteChannel;
import org.apache.arrow.vector.ipc.message.MessageSerializer;
import org.apache.arrow.vector.types.pojo.Schema;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.channels.Channels;

public class ArrowIpcUtil {
public static long serialize(OutputStream outputStream, Schema schema) throws IOException {
// not buffered. no flushing needed. not closing write channel
return MessageSerializer.serialize(new WriteChannel(Channels.newChannel(outputStream)), schema);
}

public static Schema deserialize(InputStream in) throws IOException {
// not buffered. not closing read channel
return MessageSerializer.deserializeSchema(new ReadChannel(Channels.newChannel(in)));
}

public static Schema deserialize(byte[] buf, int offset, int length) throws IOException {
return deserialize(new ByteArrayInputStream(buf, offset, length));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,8 @@ private static Optional<String> extractFlatBufferVersion(String method) {
Instant.class,
Boolean.class,
LocalDate.class,
LocalTime.class));
LocalTime.class,
Schema.class));

public static ByteString schemaBytesFromTable(@NotNull final Table table) {
return schemaBytesFromTableDefinition(table.getDefinition(), table.getAttributes(), table.isFlat());
Expand Down Expand Up @@ -745,7 +746,8 @@ private static ArrowType arrowTypeFor(Class<?> type) {
return Types.MinorType.TIMENANO.getType();
}
if (type == BigDecimal.class
|| type == BigInteger.class) {
|| type == BigInteger.class
|| type == Schema.class) {
return Types.MinorType.VARBINARY.getType();
}
if (type == Instant.class || type == ZonedDateTime.class) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.extensions.barrage.util;

import org.apache.arrow.vector.types.Types;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;
import org.junit.Test;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.List;
import java.util.Map;

import static org.assertj.core.api.Assertions.assertThat;

public class ArrowIpcUtilTest {

public static final Field FOO = new Field("Foo", FieldType.nullable(Types.MinorType.INT.getType()), null);
public static final Field BAR = new Field("Bar", FieldType.notNullable(Types.MinorType.INT.getType()), null);
public static final Field BAZ = new Field("Baz",
new FieldType(true, Types.MinorType.VARCHAR.getType(), null, Map.of("k1", "v1", "k2", "v2")), null);

private static final Schema SCHEMA_1 = new Schema(List.of(FOO, BAR, BAZ));
private static final Schema SCHEMA_2 =
new Schema(List.of(FOO, BAR, BAZ), Map.of("key1", "value1", "key2", "value2"));

@Test
public void testSchemas() throws IOException {
verifySerDeser(SCHEMA_1);
verifySerDeser(SCHEMA_2);
}

// A bit circular, but better than nothing.
public static void verifySerDeser(Schema schema) throws IOException {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final long length = ArrowIpcUtil.serialize(baos, schema);
assertThat(length).isEqualTo(baos.size());
Schema deserialized = ArrowIpcUtil.deserialize(baos.toByteArray(), 0, (int) length);
assertThat(deserialized).isEqualTo(schema);
}
}

0 comments on commit b823ccd

Please sign in to comment.