From d04e9885724918ccb7401b15422236616c62def0 Mon Sep 17 00:00:00 2001 From: Amogh Jahagirdar Date: Mon, 20 Jan 2025 10:18:45 -0700 Subject: [PATCH] Kernel: Add a conversion utility to convert Iceberg schema to Delta schema --- LICENSE.txt | 10 ++ NOTICE.txt | 3 + build.sbt | 3 +- .../util/IcebergTypeToStructType.java | 131 ++++++++++++++++++ .../kernel/internal/util/SchemaUtils.java | 6 + .../internal/util/SchemaUtilsSuite.scala | 67 ++++++++- 6 files changed, 218 insertions(+), 2 deletions(-) create mode 100644 kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/IcebergTypeToStructType.java diff --git a/LICENSE.txt b/LICENSE.txt index a04059199fb..49bec9b4e17 100644 --- a/LICENSE.txt +++ b/LICENSE.txt @@ -386,3 +386,13 @@ license, which is reproduced here as well: WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. + +-------------------------------------------------------------------------------- + +This product includes code from Apache Iceberg. + +* io.delta.kernel.internal.util.IcebergTypeToStructType visitor implementation based on org.apache.iceberg.spark.TypeToSparkType + +Copyright: 2018-2024 The Apache Software Foundation +Home page: https://iceberg.apache.org/ +License: https://www.apache.org/licenses/LICENSE-2.0 \ No newline at end of file diff --git a/NOTICE.txt b/NOTICE.txt index 4e84d594faf..52181286368 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -20,5 +20,8 @@ from the Apache Spark project (www.github.com/apache/spark) Apache Spark Copyright 2014 and onwards The Apache Software Foundation. +Apache Iceberg +Copyright 2017-2024 The Apache Software Foundation + This product includes software developed at The Apache Software Foundation (http://www.apache.org/). diff --git a/build.sbt b/build.sbt index 08f524b0f02..060b78fe100 100644 --- a/build.sbt +++ b/build.sbt @@ -577,7 +577,8 @@ lazy val kernelApi = (project in file("kernel/kernel-api")) "com.fasterxml.jackson.core" % "jackson-core" % "2.13.5", "com.fasterxml.jackson.core" % "jackson-annotations" % "2.13.5", "com.fasterxml.jackson.datatype" % "jackson-datatype-jdk8" % "2.13.5", - + "org.apache.iceberg" % "iceberg-api" % "1.7.1", + "org.apache.iceberg" % "iceberg-core" % "1.7.1", "org.scalatest" %% "scalatest" % scalaTestVersion % "test", "junit" % "junit" % "4.13.2" % "test", "com.novocode" % "junit-interface" % "0.11" % "test", diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/IcebergTypeToStructType.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/IcebergTypeToStructType.java new file mode 100644 index 00000000000..503596f462d --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/IcebergTypeToStructType.java @@ -0,0 +1,131 @@ +/* + * Copyright (2023) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.internal.util; + +import io.delta.kernel.types.ArrayType; +import io.delta.kernel.types.BinaryType; +import io.delta.kernel.types.BooleanType; +import io.delta.kernel.types.DataType; +import io.delta.kernel.types.DateType; +import io.delta.kernel.types.DecimalType; +import io.delta.kernel.types.DoubleType; +import io.delta.kernel.types.FieldMetadata; +import io.delta.kernel.types.FloatType; +import io.delta.kernel.types.IntegerType; +import io.delta.kernel.types.LongType; +import io.delta.kernel.types.MapType; +import io.delta.kernel.types.StringType; +import io.delta.kernel.types.StructField; +import io.delta.kernel.types.StructType; +import io.delta.kernel.types.TimestampNTZType; +import io.delta.kernel.types.TimestampType; +import java.util.List; +import org.apache.iceberg.Schema; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; + +/** + * Schema visitor to convert Iceberg types to Delta kernel types Most code and behavior is taken + * from {@link org.apache.iceberg.spark.TypeToSparkType} with modifications for copying field IDs to + * FieldMetadata + */ +class IcebergTypeToStructType extends TypeUtil.SchemaVisitor { + @Override + public DataType schema(Schema schema, DataType structType) { + return structType; + } + + public DataType list(Types.ListType list, DataType elementResult) { + return new ArrayType(elementResult, list.isElementOptional()); + } + + @Override + public DataType map(Types.MapType map, DataType keyResult, DataType valueResult) { + return new MapType(keyResult, valueResult, map.isValueOptional()); + } + + @Override + public DataType struct(Types.StructType struct, List fieldResults) { + List fields = struct.fields(); + + List structFields = Lists.newArrayListWithExpectedSize(fieldResults.size()); + for (int i = 0; i < fields.size(); i++) { + Types.NestedField field = fields.get(i); + DataType type = fieldResults.get(i); + StructField structField = + new StructField( + field.name(), + type, + field.isOptional(), + FieldMetadata.builder() + .putLong(ColumnMapping.COLUMN_MAPPING_ID_KEY, field.fieldId()) + .build()); + // ToDo: No concept of field comments in Delta StructType? + structFields.add(structField); + } + + return new StructType(structFields); + } + + @Override + public DataType field(Types.NestedField field, DataType fieldResult) { + return fieldResult; + } + + @Override + public DataType primitive(Type.PrimitiveType primitive) { + switch (primitive.typeId()) { + case BOOLEAN: + return BooleanType.BOOLEAN; + case INTEGER: + return IntegerType.INTEGER; + case LONG: + return LongType.LONG; + case FLOAT: + return FloatType.FLOAT; + case DOUBLE: + return DoubleType.DOUBLE; + case DATE: + return DateType.DATE; + case TIME: + throw new UnsupportedOperationException("Spark does not support time fields"); + case TIMESTAMP: + Types.TimestampType ts = (Types.TimestampType) primitive; + if (ts.shouldAdjustToUTC()) { + return TimestampType.TIMESTAMP; + } else { + return TimestampNTZType.TIMESTAMP_NTZ; + } + case STRING: + return StringType.STRING; + case UUID: + // use String + return StringType.STRING; + case FIXED: + return BinaryType.BINARY; + case BINARY: + return BinaryType.BINARY; + case DECIMAL: + Types.DecimalType decimal = (Types.DecimalType) primitive; + return new DecimalType(decimal.precision(), decimal.scale()); + default: + throw new UnsupportedOperationException( + "Cannot convert unknown type to Spark: " + primitive); + } + } +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/SchemaUtils.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/SchemaUtils.java index 31e839dc0b6..0ac0e381ae8 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/SchemaUtils.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/SchemaUtils.java @@ -23,6 +23,8 @@ import io.delta.kernel.types.*; import java.util.*; import java.util.stream.Collectors; +import org.apache.iceberg.Schema; +import org.apache.iceberg.types.TypeUtil; /** * Utility methods for schema related operations such as validating the schema has no duplicate @@ -77,6 +79,10 @@ public static void validateSchema(StructType schema, boolean isColumnMappingEnab validateSupportedType(schema); } + public static StructType icebergToDeltaSchema(Schema icebergSchema) { + return (StructType) TypeUtil.visit(icebergSchema, new IcebergTypeToStructType()); + } + /** * Verify the partition columns exists in the table schema and a supported data type for a * partition column. diff --git a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/util/SchemaUtilsSuite.scala b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/util/SchemaUtilsSuite.scala index 6e9381eee03..07402e7e2da 100644 --- a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/util/SchemaUtilsSuite.scala +++ b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/util/SchemaUtilsSuite.scala @@ -18,7 +18,10 @@ package io.delta.kernel.internal.util import io.delta.kernel.exceptions.KernelException import io.delta.kernel.internal.util.SchemaUtils.validateSchema import io.delta.kernel.types.IntegerType.INTEGER -import io.delta.kernel.types.{ArrayType, MapType, StringType, StructType} +import io.delta.kernel.types.{ArrayType, BinaryType, BooleanType, DateType, DecimalType, DoubleType, FieldMetadata, FloatType, LongType, MapType, StringType, StructType, TimestampNTZType, TimestampType} +import org.apache.iceberg.Schema +import org.apache.iceberg.types.Types +import org.apache.iceberg.types.Types.NestedField.{optional, required} import org.scalatest.funsuite.AnyFunSuite import java.util.Locale @@ -273,4 +276,66 @@ class SchemaUtilsSuite extends AnyFunSuite { } } } + + /////////////////////////////////////////////////////////////////////////// + // Iceberg Schema Conversion + /////////////////////////////////////////////////////////////////////////// + test("check Iceberg schema conversion") { + val icebergSchema = new Schema(Types.StructType.of( + optional(1, "intCol", Types.IntegerType.get), + optional(3, "longCol", Types.LongType.get), + required(9, "doubleCol", Types.DoubleType.get), + required(10, "uuidCol", Types.UUIDType.get), + optional(2, "booleanCol", Types.BooleanType.get), + optional(21, "fixedCol", Types.FixedType.ofLength(4096)), + required(22, "binaryCol", Types.BinaryType.get), + required(23, "stringCol", Types.StringType.get), + required(25, "floatCol", Types.FloatType.get), + optional(30, "dateCol", Types.DateType.get), + required(34, "timestampCol", Types.TimestampType.withZone), + required(35, "timestampNtzCol", Types.TimestampType.withoutZone()), + required(114, "dec_9_0", Types.DecimalType.of(9, 0)), + required(115, "dec_11_2", Types.DecimalType.of(11, 2)), + required(116, "dec_38_10", Types.DecimalType.of(38, 10)), + required(117, "struct", Types.StructType.of( + Types.NestedField.required(118, "nested_struct", Types.StructType.of( + Types.NestedField.required(119, "nested_integer", Types.IntegerType.get()), + Types.NestedField.optional(120, "nested_string", Types.StringType.get()))))), + required(121, "map", Types.MapType.ofRequired( + 122, 123, Types.StringType.get(), Types.StringType.get())), + optional(124, "array", Types.ListType.ofRequired(125, Types.LongType.get()))).fields()) + + val convertedSchema = SchemaUtils.icebergToDeltaSchema(icebergSchema) + + val expectedDeltaSchema = new StructType() + .add("intCol", INTEGER, true, icebergFieldMetadata(1)) + .add("longCol", LongType.LONG, true, icebergFieldMetadata(3)) + .add("doubleCol", DoubleType.DOUBLE, false, icebergFieldMetadata(9)) + .add("uuidCol", StringType.STRING, false, icebergFieldMetadata(10)) + .add("booleanCol", BooleanType.BOOLEAN, true, icebergFieldMetadata(2)) + .add("fixedCol", BinaryType.BINARY, true, icebergFieldMetadata(21)) + .add("binaryCol", BinaryType.BINARY, false, icebergFieldMetadata(22)) + .add("stringCol", StringType.STRING, false, icebergFieldMetadata(23)) + .add("floatCol", FloatType.FLOAT, false, icebergFieldMetadata(25)) + .add("dateCol", DateType.DATE, true, icebergFieldMetadata(30)) + .add("timestampCol", TimestampType.TIMESTAMP, false, icebergFieldMetadata(34)) + .add("timestampNtzCol", TimestampNTZType.TIMESTAMP_NTZ, false, icebergFieldMetadata(35)) + .add("dec_9_0", new DecimalType(9, 0), false, icebergFieldMetadata(114)) + .add("dec_11_2", new DecimalType(11, 2), false, icebergFieldMetadata(115)) + .add("dec_38_10", new DecimalType(38, 10), false, icebergFieldMetadata(116)) + .add("struct", new StructType() + .add("nested_struct", + new StructType().add("nested_integer", INTEGER, false, icebergFieldMetadata(119)) + .add("nested_string", StringType.STRING, true, icebergFieldMetadata(120)), + false, icebergFieldMetadata(118)), false, icebergFieldMetadata(117)) + .add("map", new MapType(StringType.STRING, StringType.STRING, false), + false, icebergFieldMetadata(121)) + .add("array", new ArrayType(LongType.LONG, false), true, icebergFieldMetadata(124)) + + assert(convertedSchema.equals(expectedDeltaSchema)) + } + + private def icebergFieldMetadata(id: Int): FieldMetadata = { + FieldMetadata.builder().putLong(ColumnMapping.COLUMN_MAPPING_ID_KEY, id).build() + } }