Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prune Non-referenced Fields from Nested RowTypes #23074

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions core/trino-spi/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,16 @@
<old>interface io.trino.spi.exchange.ExchangeManagerHandleResolver</old>
<justification>cleanup</justification>
</item>
<item>
<ignore>true</ignore>
<code>java.method.visibilityIncreased</code>
<new>method void io.trino.spi.type.RowType::&lt;init&gt;(io.trino.spi.type.TypeSignature, java.util.List&lt;io.trino.spi.type.RowType.Field&gt;)</new>
</item>
<item>
<ignore>true</ignore>
<code>java.method.visibilityIncreased</code>
<new>method io.trino.spi.type.TypeSignature io.trino.spi.type.RowType::makeSignature(java.util.List&lt;io.trino.spi.type.RowType.Field&gt;)</new>
</item>
</differences>
</revapi.differences>
</analysisConfiguration>
Expand Down
4 changes: 2 additions & 2 deletions core/trino-spi/src/main/java/io/trino/spi/type/RowType.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public class RowType
private final int flatFixedSize;
private final boolean flatVariableWidth;

private RowType(TypeSignature typeSignature, List<Field> originalFields)
protected RowType(TypeSignature typeSignature, List<Field> originalFields)
{
super(typeSignature, SqlRow.class, RowBlock.class);

Expand Down Expand Up @@ -188,7 +188,7 @@ public static Field field(Type type)
return new Field(Optional.empty(), type);
}

private static TypeSignature makeSignature(List<Field> fields)
protected static TypeSignature makeSignature(List<Field> fields)
{
int size = fields.size();
if (size == 0) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.hive.formats;

import com.google.common.collect.ImmutableList;
import io.trino.spi.type.RowType;

import java.util.List;

import static com.google.common.base.Preconditions.checkArgument;

/**
* A SparseRowType is a RowType that conveys which fields are active in a row.
* <p>
* It manages the positional mapping between the "sparse" fields and the
* underlying "dense" RowType. It allows position-based deserializers to know
* the complete schema for a Row while also knowing which fields need to be
* deserialized. Name-based deserializers can simply use the dense fields
* from the underlying RowType.
*/
public class SparseRowType
extends RowType
{
private final List<Field> sparseFields;
private final List<Integer> offsets;

private SparseRowType(List<Field> sparseFields, List<Field> denseFields, List<Integer> offsets)
{
super(makeSignature(denseFields), denseFields);
this.sparseFields = sparseFields;
this.offsets = offsets;
}

/**
* Create a SparseRowType from a list of fields and a mask indicating which fields are active.
*/
public static SparseRowType from(List<Field> fields, boolean[] mask)
{
checkArgument(fields.size() == mask.length);

ImmutableList.Builder<Integer> offsets = ImmutableList.builder();
ImmutableList.Builder<Field> denseFields = ImmutableList.builder();

int offset = 0;
for (int i = 0; i < mask.length; i++) {
if (mask[i]) {
denseFields.add(fields.get(i));
offsets.add(offset++);
}
else {
offsets.add(-1);
}
}

return new SparseRowType(ImmutableList.copyOf(fields), denseFields.build(), offsets.build());
}

public static SparseRowType initial(List<Field> fields, Integer activeField)
{
boolean[] mask = new boolean[fields.size()];
mask[activeField] = true;
return SparseRowType.from(fields, mask);
}

public List<Field> getSparseFields()
{
return sparseFields;
}

/**
* Get the offset to the dense field for the sparseField at sparsePosition.
*/
public Integer getOffset(int sparsePosition)
{
return offsets.get(sparsePosition) >= 0
? offsets.get(sparsePosition)
: null;
}

public List<Integer> getOffsets()
{
return offsets.stream().map(i -> i >= 0 ? i : null).toList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.trino.spi.type.RowType;

import java.util.List;
import java.util.stream.IntStream;

public class StructEncoding
extends BlockEncoding
Expand All @@ -31,6 +32,7 @@ public class StructEncoding
private final byte separator;
private final boolean lastColumnTakesRest;
private final List<TextColumnEncoding> structFields;
private final List<Integer> fieldOffsets;

public StructEncoding(
RowType rowType,
Expand All @@ -45,6 +47,26 @@ public StructEncoding(
this.separator = separator;
this.lastColumnTakesRest = lastColumnTakesRest;
this.structFields = structFields;
this.fieldOffsets = IntStream.range(0, structFields.size())
.boxed()
.toList();
}

public StructEncoding(
RowType rowType,
Slice nullSequence,
byte separator,
Byte escapeByte,
boolean lastColumnTakesRest,
List<TextColumnEncoding> structFields,
List<Integer> fieldOffsets)
{
super(rowType, nullSequence, escapeByte);
this.rowType = rowType;
this.separator = separator;
this.lastColumnTakesRest = lastColumnTakesRest;
this.structFields = structFields;
this.fieldOffsets = fieldOffsets;
}

@Override
Expand Down Expand Up @@ -80,7 +102,10 @@ public void decodeValueInto(BlockBuilder builder, Slice slice, int offset, int l
while (currentOffset < end) {
byte currentByte = slice.getByte(currentOffset);
if (currentByte == separator) {
decodeElementValueInto(fieldIndex, fieldBuilders.get(fieldIndex), slice, elementOffset, currentOffset - elementOffset);
Integer fieldOffset = fieldOffsets.get(fieldIndex);
if (fieldOffset != null) {
decodeElementValueInto(fieldIndex, fieldBuilders.get(fieldOffset), slice, elementOffset, currentOffset - elementOffset);
}
elementOffset = currentOffset + 1;
fieldIndex++;
if (lastColumnTakesRest && fieldIndex == structFields.size() - 1) {
Expand All @@ -98,7 +123,10 @@ else if (isEscapeByte(currentByte)) {
}
currentOffset++;
}
decodeElementValueInto(fieldIndex, fieldBuilders.get(fieldIndex), slice, elementOffset, end - elementOffset);
Integer fieldOffset = fieldOffsets.get(fieldIndex);
if (fieldOffset != null) {
decodeElementValueInto(fieldIndex, fieldBuilders.get(fieldOffset), slice, elementOffset, end - elementOffset);
}
fieldIndex++;

// missing fields are null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.hive.formats.encodings.text;

import io.trino.hive.formats.SparseRowType;
import io.trino.hive.formats.encodings.ColumnEncodingFactory;
import io.trino.spi.TrinoException;
import io.trino.spi.type.ArrayType;
Expand Down Expand Up @@ -136,6 +137,21 @@ private TextColumnEncoding getEncoding(Type type, int depth)
keyEncoding,
valueEncoding);
}
if (type instanceof SparseRowType sparseRowType) {
List<TextColumnEncoding> fieldEncodings = sparseRowType.getSparseFields().stream()
.map(RowType.Field::getType)
.map(fieldType -> getEncoding(fieldType, depth + 1))
.collect(toImmutableList());
List<Integer> fieldOffsets = sparseRowType.getOffsets();
return new StructEncoding(
sparseRowType,
textEncodingOptions.getNullSequence(),
getSeparator(depth + 1),
textEncodingOptions.getEscapeByte(),
textEncodingOptions.isLastColumnTakesRest(),
fieldEncodings,
fieldOffsets);
}
if (type instanceof RowType rowType) {
List<TextColumnEncoding> fieldEncodings = rowType.getTypeParameters().stream()
.map(fieldType -> getEncoding(fieldType, depth + 1))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import io.trino.hive.formats.DistinctMapKeys;
import io.trino.hive.formats.SparseRowType;
import io.trino.hive.formats.line.Column;
import io.trino.hive.formats.line.LineBuffer;
import io.trino.hive.formats.line.LineDeserializer;
Expand Down Expand Up @@ -56,6 +57,7 @@
import java.util.Set;
import java.util.function.IntFunction;
import java.util.regex.Pattern;
import java.util.stream.IntStream;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
Expand Down Expand Up @@ -746,6 +748,7 @@ private static class RowDecoder
{
private final List<FieldName> fieldNames;
private final List<Decoder> fieldDecoders;
private final List<Integer> decoderOffsets;
private final boolean dotsInKeyNames;

public RowDecoder(RowType rowType, OpenXJsonOptions options, List<Decoder> fieldDecoders)
Expand All @@ -755,6 +758,16 @@ public RowDecoder(RowType rowType, OpenXJsonOptions options, List<Decoder> field
.map(fieldName -> fieldName.toLowerCase(Locale.ROOT))
.map(originalValue -> new FieldName(originalValue, options))
.collect(toImmutableList());
if (rowType instanceof SparseRowType sparseRowType) {
// build an inverse mapping, from dense fields to sparse fields
decoderOffsets = IntStream.range(0, sparseRowType.getSparseFields().size())
.filter(sparsePos -> sparseRowType.getOffset(sparsePos) != null)
.boxed()
.toList();
}
else {
decoderOffsets = IntStream.range(0, fieldDecoders.size()).boxed().toList();
}
this.fieldDecoders = fieldDecoders;
this.dotsInKeyNames = options.isDotsInFieldNames();
}
Expand Down Expand Up @@ -831,7 +844,8 @@ else if (dotsInKeyNames) {
private void decodeValueFromList(List<?> jsonArray, IntFunction<BlockBuilder> fieldBuilders)
{
for (int i = 0; i < fieldDecoders.size(); i++) {
Object fieldValue = jsonArray.size() > i ? jsonArray.get(i) : null;
int position = decoderOffsets.get(i);
Object fieldValue = jsonArray.size() > position ? jsonArray.get(position) : null;
BlockBuilder blockBuilder = fieldBuilders.apply(i);
if (fieldValue == null) {
blockBuilder.appendNull();
Expand Down
Loading
Loading