Skip to content

Commit

Permalink
Revert "Generate Coral's RelNode for views from base table schema (#409
Browse files Browse the repository at this point in the history
…) (#489)

* Revert "Generate Coral's RelNode for views from base table schema (#409)"

This reverts commit 76789bf.

* ./gradlew spotlessApply

* modify workflow action
  • Loading branch information
aastha25 authored Feb 13, 2024
1 parent f200f21 commit 40f6958
Show file tree
Hide file tree
Showing 7 changed files with 160 additions and 28 deletions.
27 changes: 25 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ name: CI

on:
push:
branches: ['master']
branches: ['master', 'li-trino-hotfix']
tags-ignore: [v*] # release tags are autogenerated after a successful CI, no need to run CI against them
pull_request:
branches: ['**']
Expand All @@ -35,7 +35,7 @@ jobs:
- name: 3. Perform build
run: ./gradlew -i build

- name: 4. Perform release
- name: 4. Perform release for master branch if commit is on master
# Release job, only for pushes to the main development branch
if: github.event_name == 'push'
&& github.ref == 'refs/heads/master'
Expand All @@ -49,3 +49,26 @@ jobs:
SONATYPE_PWD: ${{secrets.SONATYPE_PWD}}
PGP_KEY: ${{secrets.PGP_KEY}}
PGP_PWD: ${{secrets.PGP_PWD}}

- name: 5. Derive version for li-trino-hotfix
if: github.ref == 'refs/heads/li-trino-hotfix'
run: |
branch_name="${{ github.ref#refs/heads/ }}"
echo "branch_name=${branch_name}" >> $GITHUB_ENV
echo "DERIVED_VERSION=2.2.27-${branch_name}" >> $GITHUB_ENV
- name: 6. Perform release for li-trino-hotfix branch if commit is this branch
# Release job, only for pushes to the main development branch
if: github.event_name == 'push'
&& github.ref == 'refs/heads/li-trino-hotfix'
&& github.repository == 'linkedin/coral'
&& !contains(toJSON(github.event.commits.*.message), '[skip release]')

run: ./gradlew githubRelease publishToSonatype closeAndReleaseStagingRepository --stacktrace -Pversion="${{ env.DERIVED_VERSION }}"
env:
GITHUB_TOKEN: ${{secrets.GITHUB_TOKEN}}
SONATYPE_USER: ${{secrets.SONATYPE_USER}}
SONATYPE_PWD: ${{secrets.SONATYPE_PWD}}
PGP_KEY: ${{secrets.PGP_KEY}}
PGP_PWD: ${{secrets.PGP_PWD}}

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2017-2023 LinkedIn Corporation. All rights reserved.
* Copyright 2017-2024 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
Expand All @@ -10,9 +10,17 @@
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.core.RelFactories;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexUtil;
import org.apache.calcite.schema.TranslatableTable;
import org.apache.hadoop.hive.metastore.api.Table;

import com.linkedin.coral.com.google.common.base.Preconditions;
import com.linkedin.coral.com.google.common.base.Throwables;
import com.linkedin.coral.com.google.common.collect.ImmutableList;

Expand Down Expand Up @@ -42,10 +50,129 @@ public RelNode toRel(RelOptTable.ToRelContext relContext, RelOptTable relOptTabl
try {
RelRoot root = relContext.expandView(relOptTable.getRowType(), hiveTable.getViewExpandedText(), schemaPath,
ImmutableList.of(hiveTable.getTableName()));
root = root.withRel(createCastRel(root.rel, relOptTable.getRowType(), RelFactories.DEFAULT_PROJECT_FACTORY));
//root = root.withRel(RelOptUtil.createCastRel(root.rel, relOptTable.getRowType()));
return root.rel;
} catch (Exception e) {
Throwables.propagateIfInstanceOf(e, RuntimeException.class);
throw new RuntimeException("Error while parsing view definition", e);
}
}

public static RelNode createCastRel(final RelNode rel, RelDataType castRowType,
RelFactories.ProjectFactory projectFactory) {
Preconditions.checkNotNull(projectFactory);

RelDataType rowType = rel.getRowType();
RelDataTypeFactory typeFactory = rel.getCluster().getTypeFactory();
if (isRowCastRequired(rowType, castRowType, typeFactory)) {
final RexBuilder rexBuilder = rel.getCluster().getRexBuilder();
final List<RexNode> castExps = RexUtil.generateCastExpressions(rexBuilder, castRowType, rowType);
return projectFactory.createProject(rel, castExps, rowType.getFieldNames());
} else {
return rel;
}
}

// Hive-based Dali readers allow extra fields on struct type columns to flow through. We
// try to match that behavior. Hive-based Dali readers do not allow top level columns to flow through
// Returns true if an explicit cast is required from rowType to castRowType, false otherwise
private static boolean isRowCastRequired(RelDataType rowType, RelDataType castRowType,
RelDataTypeFactory typeFactory) {
if (rowType == castRowType) {
return false;
}
List<RelDataTypeField> relFields = rowType.getFieldList();
List<RelDataTypeField> castFields = castRowType.getFieldList();
if (relFields.size() != castFields.size()) {
return true;
}
// the following method will return false if the schema evolution is backward compatible.
// i.e., when new fields are inserted to the middle or appended to the end of the field list.
return isFieldListCastRequired(relFields, castFields, typeFactory);
}

// Returns true if an explicit cast is required from inputType to castToType, false otherwise
// From what we noticed, cast will be required when there's schema promotion
private static boolean isTypeCastRequired(RelDataType inputType, RelDataType castToType,
RelDataTypeFactory typeFactory) {
if (inputType == castToType) {
return false;
}
if (inputType.getSqlTypeName() == ANY || castToType.getSqlTypeName() == ANY) {
return false;
}
// Hive has string type which is varchar(65k). This results in lot of redundant
// cast operations due to different precision(length) of varchar.
// Also, all projections of string literals are of type CHAR but the hive schema
// may mark that column as string. This again results in unnecessary cast operations.
// We avoid all these casts
if ((inputType.getSqlTypeName() == CHAR || inputType.getSqlTypeName() == VARCHAR)
&& castToType.getSqlTypeName() == VARCHAR) {
return false;
}

// Make sure both source and target has same root SQL Type
if (inputType.getSqlTypeName() != castToType.getSqlTypeName()) {
return true;
}

// Calcite gets the castToType from the view schema, and it's nullable by default, but the inputType
// is inferred from the view sql and for some columns where functions like named_strcut, ISNULL are applied,
// calcite will set the inputType to be non-nullable. Thus it generates unnecessary cast operators.
// we should ignore the nullability check when deciding whether to do field casting
RelDataType nullableFieldDataType = typeFactory.createTypeWithNullability(inputType, true);
switch (inputType.getSqlTypeName()) {
case ARRAY:
return isTypeCastRequired(inputType.getComponentType(), castToType.getComponentType(), typeFactory);
case MAP:
return isTypeCastRequired(inputType.getKeyType(), castToType.getKeyType(), typeFactory)
|| isTypeCastRequired(inputType.getValueType(), inputType.getValueType(), typeFactory);
case ROW:
return isFieldListCastRequired(inputType.getFieldList(), castToType.getFieldList(), typeFactory);
default:
return !nullableFieldDataType.equals(castToType);
}
}

/**
* The method will check if the inputFields has to be casted to castToFields.
*
* e.g.
* (1) if castToFields = List(f1, f3), inputFields = List(f1, f2, f3), then return false.
* (2) if castToFields = List(f1, f3), inputFields = List(f1, f3, f4), then return false.
* (3) if castToFields = List(f1, f3), inputFields = List(f1, f4), then return true.
*
* @param inputFields a list of RelDataTypeField to cast from if required.
* @param castToFields a list of RelDataTypeField to cast to if required.
*
* @return if a CAST operator will be generated from inputFields to castToFields
*/
private static boolean isFieldListCastRequired(List<RelDataTypeField> inputFields,
List<RelDataTypeField> castToFields, RelDataTypeFactory typeFactory) {
if (inputFields.size() < castToFields.size()) {
return true;
}

int inputIndex = 0;
int inputFieldsSize = inputFields.size();
for (RelDataTypeField castToField : castToFields) {
while (inputIndex < inputFieldsSize) {
RelDataTypeField inputField = inputFields.get(inputIndex);
if (inputField.getName().equalsIgnoreCase(castToField.getName())) {
if (isTypeCastRequired(inputField.getType(), castToField.getType(), typeFactory)) {
return true;
}
break;
} else {
++inputIndex;
}
}
// If there's no match for a field in castToFields to inputFields
if (inputIndex++ >= inputFieldsSize) {
return true;
}
}
return false;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2017-2023 LinkedIn Corporation. All rights reserved.
* Copyright 2017-2024 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
Expand Down Expand Up @@ -140,16 +140,12 @@ public static RelDataType convert(StructTypeInfo structType, final RelDataTypeFa
// The schema of output Struct conforms to https://github.com/trinodb/trino/pull/3483
// except we adopted "integer" for the type of "tag" field instead of "tinyint" in the Trino patch
// for compatibility with other platforms that Iceberg currently doesn't support tinyint type.
// When the field count inside UnionTypeInfo is one, we surface the underlying RelDataType instead.

// Note: this is subject to change in the future pending on the discussion in
// https://mail-archives.apache.org/mod_mbox/iceberg-dev/202112.mbox/browser
public static RelDataType convert(UnionTypeInfo unionType, RelDataTypeFactory dtFactory) {
List<RelDataType> fTypes = unionType.getAllUnionObjectTypeInfos().stream()
.map(typeInfo -> convert(typeInfo, dtFactory)).collect(Collectors.toList());
if (fTypes.size() == 1) {
return dtFactory.createTypeWithNullability(fTypes.get(0), true);
}
List<String> fNames = IntStream.range(0, unionType.getAllUnionObjectTypeInfos().size()).mapToObj(i -> "field" + i)
.collect(Collectors.toList());
fTypes.add(0, dtFactory.createSqlType(SqlTypeName.INTEGER));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2018-2023 LinkedIn Corporation. All rights reserved.
* Copyright 2018-2024 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
Expand Down Expand Up @@ -74,15 +74,6 @@ public RelDataType deriveType(SqlValidator validator, SqlValidatorScope scope, S
if (funcType.isStruct()) {
return funcType.getField(fieldNameStripQuotes(call.operand(1)), false, false).getType();
}

// When the first operand is a SqlBasicCall with a non-struct RelDataType and the second operand is `tag_0`,
// such as `extract_union`(`product`.`value`).`tag_0` or (`extract_union`(`product`.`value`).`id`).`tag_0`,
// derived data type is first operand's RelDataType.
// This strategy ensures that RelDataType derivation remains successful for the specified sqlCalls while maintaining backward compatibility.
// Such SqlCalls are transformed {@link com.linkedin.coral.transformers.SingleUnionFieldReferenceTransformer}
if (FunctionFieldReferenceOperator.fieldNameStripQuotes(call.operand(1)).equalsIgnoreCase("tag_0")) {
return funcType;
}
}
return super.deriveType(validator, scope, call);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2017-2023 LinkedIn Corporation. All rights reserved.
* Copyright 2017-2024 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
Expand All @@ -13,7 +13,6 @@
import com.linkedin.coral.common.transformers.SqlCallTransformers;
import com.linkedin.coral.common.utils.TypeDerivationUtil;
import com.linkedin.coral.transformers.ShiftArrayIndexTransformer;
import com.linkedin.coral.transformers.SingleUnionFieldReferenceTransformer;


/**
Expand All @@ -24,8 +23,7 @@ public class HiveSqlNodeToCoralSqlNodeConverter extends SqlShuttle {

public HiveSqlNodeToCoralSqlNodeConverter(SqlValidator sqlValidator, SqlNode topSqlNode) {
TypeDerivationUtil typeDerivationUtil = new TypeDerivationUtil(sqlValidator, topSqlNode);
operatorTransformerList = SqlCallTransformers.of(new ShiftArrayIndexTransformer(typeDerivationUtil),
new SingleUnionFieldReferenceTransformer(typeDerivationUtil));
operatorTransformerList = SqlCallTransformers.of(new ShiftArrayIndexTransformer(typeDerivationUtil));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,6 @@ static Schema relDataTypeToAvroTypeNonNullable(@Nonnull RelDataType relDataType,

if (relDataType instanceof MapSqlType) {
final MapSqlType mapSqlType = (MapSqlType) relDataType;
if (SqlTypeName.NULL == mapSqlType.getKeyType().getSqlTypeName()
&& SqlTypeName.NULL == mapSqlType.getValueType().getSqlTypeName()) {
return Schema.createMap(SchemaUtilities.makeNullable(Schema.create(Schema.Type.STRING), false));
}
if (!SqlTypeName.CHAR_TYPES.contains(mapSqlType.getKeyType().getSqlTypeName())) {
throw new UnsupportedOperationException(
"Key of Map can only be a String: " + mapSqlType.getKeyType().getSqlTypeName().getName());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2018-2023 LinkedIn Corporation. All rights reserved.
* Copyright 2018-2024 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
Expand Down Expand Up @@ -401,7 +401,8 @@ public void testIntervalYearToMonth() {
@Test
public void testSchemaPromotionView() {
RelNode relNode = TestUtils.toRelNode(String.join("\n", "", "SELECT * ", "FROM view_schema_promotion_wrapper"));
String targetSql = "SELECT *\n" + "FROM default.schema_promotion schema_promotion";
String targetSql = "SELECT schema_promotion.a, CAST(schema_promotion.b AS ARRAY<INTEGER>) b\n"
+ "FROM default.schema_promotion schema_promotion";
assertEquals(createCoralSpark(relNode).getSparkSql(), targetSql);
}

Expand Down

0 comments on commit 40f6958

Please sign in to comment.