Skip to content

Commit

Permalink
Make work for SEQUENCE file
Browse files Browse the repository at this point in the history
  • Loading branch information
rmarrowstone committed Aug 20, 2024
1 parent b96dc9e commit 467ae6d
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ public class SparseRowType
extends RowType
{
private final List<Field> sparseFields;
private final int[] offsets;
private final List<Integer> offsets;

private SparseRowType(List<Field> sparseFields, List<Field> denseFields, int[] offsets)
private SparseRowType(List<Field> sparseFields, List<Field> denseFields, List<Integer> offsets)
{
super(makeSignature(denseFields), denseFields);
this.sparseFields = sparseFields;
Expand All @@ -49,21 +49,21 @@ public static SparseRowType from(List<Field> fields, boolean[] mask)
{
checkArgument(fields.size() == mask.length);

int[] offsets = new int[fields.size()];
ImmutableList.Builder<Integer> offsets = ImmutableList.builder();
ImmutableList.Builder<Field> denseFields = ImmutableList.builder();

int offset = 0;
for (int i = 0; i < mask.length; i++) {
if (mask[i]) {
denseFields.add(fields.get(i));
offsets[i] = offset++;
offsets.add(offset++);
}
else {
offsets[i] = -1;
offsets.add(-1);
}
}

return new SparseRowType(ImmutableList.copyOf(fields), denseFields.build(), offsets);
return new SparseRowType(ImmutableList.copyOf(fields), denseFields.build(), offsets.build());
}

public static SparseRowType initial(List<Field> fields, Integer activeField)
Expand All @@ -83,8 +83,13 @@ public List<Field> getSparseFields()
*/
public Integer getOffset(int sparsePosition)
{
return offsets[sparsePosition] >= 0
? offsets[sparsePosition]
return offsets.get(sparsePosition) >= 0
? offsets.get(sparsePosition)
: null;
}

public List<Integer> getOffsets()
{
return offsets.stream().map(i -> i >= 0 ? i : null).toList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.trino.spi.type.RowType;

import java.util.List;
import java.util.stream.IntStream;

public class StructEncoding
extends BlockEncoding
Expand All @@ -31,6 +32,7 @@ public class StructEncoding
private final byte separator;
private final boolean lastColumnTakesRest;
private final List<TextColumnEncoding> structFields;
private final List<Integer> fieldOffsets;

public StructEncoding(
RowType rowType,
Expand All @@ -45,6 +47,26 @@ public StructEncoding(
this.separator = separator;
this.lastColumnTakesRest = lastColumnTakesRest;
this.structFields = structFields;
this.fieldOffsets = IntStream.range(0, structFields.size())
.boxed()
.toList();
}

public StructEncoding(
RowType rowType,
Slice nullSequence,
byte separator,
Byte escapeByte,
boolean lastColumnTakesRest,
List<TextColumnEncoding> structFields,
List<Integer> fieldOffsets)
{
super(rowType, nullSequence, escapeByte);
this.rowType = rowType;
this.separator = separator;
this.lastColumnTakesRest = lastColumnTakesRest;
this.structFields = structFields;
this.fieldOffsets = fieldOffsets;
}

@Override
Expand Down Expand Up @@ -80,7 +102,10 @@ public void decodeValueInto(BlockBuilder builder, Slice slice, int offset, int l
while (currentOffset < end) {
byte currentByte = slice.getByte(currentOffset);
if (currentByte == separator) {
decodeElementValueInto(fieldIndex, fieldBuilders.get(fieldIndex), slice, elementOffset, currentOffset - elementOffset);
Integer fieldOffset = fieldOffsets.get(fieldIndex);
if (fieldOffset != null) {
decodeElementValueInto(fieldIndex, fieldBuilders.get(fieldOffset), slice, elementOffset, currentOffset - elementOffset);
}
elementOffset = currentOffset + 1;
fieldIndex++;
if (lastColumnTakesRest && fieldIndex == structFields.size() - 1) {
Expand All @@ -98,7 +123,10 @@ else if (isEscapeByte(currentByte)) {
}
currentOffset++;
}
decodeElementValueInto(fieldIndex, fieldBuilders.get(fieldIndex), slice, elementOffset, end - elementOffset);
Integer fieldOffset = fieldOffsets.get(fieldIndex);
if (fieldOffset != null) {
decodeElementValueInto(fieldIndex, fieldBuilders.get(fieldOffset), slice, elementOffset, end - elementOffset);
}
fieldIndex++;

// missing fields are null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.hive.formats.encodings.text;

import io.trino.hive.formats.SparseRowType;
import io.trino.hive.formats.encodings.ColumnEncodingFactory;
import io.trino.spi.TrinoException;
import io.trino.spi.type.ArrayType;
Expand Down Expand Up @@ -136,6 +137,21 @@ private TextColumnEncoding getEncoding(Type type, int depth)
keyEncoding,
valueEncoding);
}
if (type instanceof SparseRowType sparseRowType) {
List<TextColumnEncoding> fieldEncodings = sparseRowType.getSparseFields().stream()
.map(RowType.Field::getType)
.map(fieldType -> getEncoding(fieldType, depth + 1))
.collect(toImmutableList());
List<Integer> fieldOffsets = sparseRowType.getOffsets();
return new StructEncoding(
sparseRowType,
textEncodingOptions.getNullSequence(),
getSeparator(depth + 1),
textEncodingOptions.getEscapeByte(),
textEncodingOptions.isLastColumnTakesRest(),
fieldEncodings,
fieldOffsets);
}
if (type instanceof RowType rowType) {
List<TextColumnEncoding> fieldEncodings = rowType.getTypeParameters().stream()
.map(fieldType -> getEncoding(fieldType, depth + 1))
Expand Down

0 comments on commit 467ae6d

Please sign in to comment.