Skip to content

Commit

Permalink
Kernel: Add a conversion utility to convert Iceberg schema to Delta s…
Browse files Browse the repository at this point in the history
…chema
  • Loading branch information
amogh-jahagirdar committed Jan 20, 2025
1 parent 7769d31 commit d04e988
Show file tree
Hide file tree
Showing 6 changed files with 218 additions and 2 deletions.
10 changes: 10 additions & 0 deletions LICENSE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -386,3 +386,13 @@ license, which is reproduced here as well:
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

--------------------------------------------------------------------------------

This product includes code from Apache Iceberg.

* io.delta.kernel.internal.util.IcebergTypeToStructType visitor implementation based on org.apache.iceberg.spark.TypeToSparkType

Copyright: 2018-2024 The Apache Software Foundation
Home page: https://iceberg.apache.org/
License: https://www.apache.org/licenses/LICENSE-2.0
3 changes: 3 additions & 0 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,8 @@ from the Apache Spark project (www.github.com/apache/spark)
Apache Spark
Copyright 2014 and onwards The Apache Software Foundation.

Apache Iceberg
Copyright 2017-2024 The Apache Software Foundation

This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,8 @@ lazy val kernelApi = (project in file("kernel/kernel-api"))
"com.fasterxml.jackson.core" % "jackson-core" % "2.13.5",
"com.fasterxml.jackson.core" % "jackson-annotations" % "2.13.5",
"com.fasterxml.jackson.datatype" % "jackson-datatype-jdk8" % "2.13.5",

"org.apache.iceberg" % "iceberg-api" % "1.7.1",
"org.apache.iceberg" % "iceberg-core" % "1.7.1",
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"junit" % "junit" % "4.13.2" % "test",
"com.novocode" % "junit-interface" % "0.11" % "test",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Copyright (2023) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.internal.util;

import io.delta.kernel.types.ArrayType;
import io.delta.kernel.types.BinaryType;
import io.delta.kernel.types.BooleanType;
import io.delta.kernel.types.DataType;
import io.delta.kernel.types.DateType;
import io.delta.kernel.types.DecimalType;
import io.delta.kernel.types.DoubleType;
import io.delta.kernel.types.FieldMetadata;
import io.delta.kernel.types.FloatType;
import io.delta.kernel.types.IntegerType;
import io.delta.kernel.types.LongType;
import io.delta.kernel.types.MapType;
import io.delta.kernel.types.StringType;
import io.delta.kernel.types.StructField;
import io.delta.kernel.types.StructType;
import io.delta.kernel.types.TimestampNTZType;
import io.delta.kernel.types.TimestampType;
import java.util.List;
import org.apache.iceberg.Schema;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;

/**
* Schema visitor to convert Iceberg types to Delta kernel types Most code and behavior is taken
* from {@link org.apache.iceberg.spark.TypeToSparkType} with modifications for copying field IDs to
* FieldMetadata
*/
class IcebergTypeToStructType extends TypeUtil.SchemaVisitor<DataType> {
@Override
public DataType schema(Schema schema, DataType structType) {
return structType;
}

public DataType list(Types.ListType list, DataType elementResult) {
return new ArrayType(elementResult, list.isElementOptional());
}

@Override
public DataType map(Types.MapType map, DataType keyResult, DataType valueResult) {
return new MapType(keyResult, valueResult, map.isValueOptional());
}

@Override
public DataType struct(Types.StructType struct, List<DataType> fieldResults) {
List<Types.NestedField> fields = struct.fields();

List<StructField> structFields = Lists.newArrayListWithExpectedSize(fieldResults.size());
for (int i = 0; i < fields.size(); i++) {
Types.NestedField field = fields.get(i);
DataType type = fieldResults.get(i);
StructField structField =
new StructField(
field.name(),
type,
field.isOptional(),
FieldMetadata.builder()
.putLong(ColumnMapping.COLUMN_MAPPING_ID_KEY, field.fieldId())
.build());
// ToDo: No concept of field comments in Delta StructType?
structFields.add(structField);
}

return new StructType(structFields);
}

@Override
public DataType field(Types.NestedField field, DataType fieldResult) {
return fieldResult;
}

@Override
public DataType primitive(Type.PrimitiveType primitive) {
switch (primitive.typeId()) {
case BOOLEAN:
return BooleanType.BOOLEAN;
case INTEGER:
return IntegerType.INTEGER;
case LONG:
return LongType.LONG;
case FLOAT:
return FloatType.FLOAT;
case DOUBLE:
return DoubleType.DOUBLE;
case DATE:
return DateType.DATE;
case TIME:
throw new UnsupportedOperationException("Spark does not support time fields");
case TIMESTAMP:
Types.TimestampType ts = (Types.TimestampType) primitive;
if (ts.shouldAdjustToUTC()) {
return TimestampType.TIMESTAMP;
} else {
return TimestampNTZType.TIMESTAMP_NTZ;
}
case STRING:
return StringType.STRING;
case UUID:
// use String
return StringType.STRING;
case FIXED:
return BinaryType.BINARY;
case BINARY:
return BinaryType.BINARY;
case DECIMAL:
Types.DecimalType decimal = (Types.DecimalType) primitive;
return new DecimalType(decimal.precision(), decimal.scale());
default:
throw new UnsupportedOperationException(
"Cannot convert unknown type to Spark: " + primitive);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import io.delta.kernel.types.*;
import java.util.*;
import java.util.stream.Collectors;
import org.apache.iceberg.Schema;
import org.apache.iceberg.types.TypeUtil;

/**
* Utility methods for schema related operations such as validating the schema has no duplicate
Expand Down Expand Up @@ -77,6 +79,10 @@ public static void validateSchema(StructType schema, boolean isColumnMappingEnab
validateSupportedType(schema);
}

public static StructType icebergToDeltaSchema(Schema icebergSchema) {
return (StructType) TypeUtil.visit(icebergSchema, new IcebergTypeToStructType());
}

/**
* Verify the partition columns exists in the table schema and a supported data type for a
* partition column.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ package io.delta.kernel.internal.util
import io.delta.kernel.exceptions.KernelException
import io.delta.kernel.internal.util.SchemaUtils.validateSchema
import io.delta.kernel.types.IntegerType.INTEGER
import io.delta.kernel.types.{ArrayType, MapType, StringType, StructType}
import io.delta.kernel.types.{ArrayType, BinaryType, BooleanType, DateType, DecimalType, DoubleType, FieldMetadata, FloatType, LongType, MapType, StringType, StructType, TimestampNTZType, TimestampType}
import org.apache.iceberg.Schema
import org.apache.iceberg.types.Types
import org.apache.iceberg.types.Types.NestedField.{optional, required}
import org.scalatest.funsuite.AnyFunSuite

import java.util.Locale
Expand Down Expand Up @@ -273,4 +276,66 @@ class SchemaUtilsSuite extends AnyFunSuite {
}
}
}

///////////////////////////////////////////////////////////////////////////
// Iceberg Schema Conversion
///////////////////////////////////////////////////////////////////////////
test("check Iceberg schema conversion") {
val icebergSchema = new Schema(Types.StructType.of(
optional(1, "intCol", Types.IntegerType.get),
optional(3, "longCol", Types.LongType.get),
required(9, "doubleCol", Types.DoubleType.get),
required(10, "uuidCol", Types.UUIDType.get),
optional(2, "booleanCol", Types.BooleanType.get),
optional(21, "fixedCol", Types.FixedType.ofLength(4096)),
required(22, "binaryCol", Types.BinaryType.get),
required(23, "stringCol", Types.StringType.get),
required(25, "floatCol", Types.FloatType.get),
optional(30, "dateCol", Types.DateType.get),
required(34, "timestampCol", Types.TimestampType.withZone),
required(35, "timestampNtzCol", Types.TimestampType.withoutZone()),
required(114, "dec_9_0", Types.DecimalType.of(9, 0)),
required(115, "dec_11_2", Types.DecimalType.of(11, 2)),
required(116, "dec_38_10", Types.DecimalType.of(38, 10)),
required(117, "struct", Types.StructType.of(
Types.NestedField.required(118, "nested_struct", Types.StructType.of(
Types.NestedField.required(119, "nested_integer", Types.IntegerType.get()),
Types.NestedField.optional(120, "nested_string", Types.StringType.get()))))),
required(121, "map", Types.MapType.ofRequired(
122, 123, Types.StringType.get(), Types.StringType.get())),
optional(124, "array", Types.ListType.ofRequired(125, Types.LongType.get()))).fields())

val convertedSchema = SchemaUtils.icebergToDeltaSchema(icebergSchema)

val expectedDeltaSchema = new StructType()
.add("intCol", INTEGER, true, icebergFieldMetadata(1))
.add("longCol", LongType.LONG, true, icebergFieldMetadata(3))
.add("doubleCol", DoubleType.DOUBLE, false, icebergFieldMetadata(9))
.add("uuidCol", StringType.STRING, false, icebergFieldMetadata(10))
.add("booleanCol", BooleanType.BOOLEAN, true, icebergFieldMetadata(2))
.add("fixedCol", BinaryType.BINARY, true, icebergFieldMetadata(21))
.add("binaryCol", BinaryType.BINARY, false, icebergFieldMetadata(22))
.add("stringCol", StringType.STRING, false, icebergFieldMetadata(23))
.add("floatCol", FloatType.FLOAT, false, icebergFieldMetadata(25))
.add("dateCol", DateType.DATE, true, icebergFieldMetadata(30))
.add("timestampCol", TimestampType.TIMESTAMP, false, icebergFieldMetadata(34))
.add("timestampNtzCol", TimestampNTZType.TIMESTAMP_NTZ, false, icebergFieldMetadata(35))
.add("dec_9_0", new DecimalType(9, 0), false, icebergFieldMetadata(114))
.add("dec_11_2", new DecimalType(11, 2), false, icebergFieldMetadata(115))
.add("dec_38_10", new DecimalType(38, 10), false, icebergFieldMetadata(116))
.add("struct", new StructType()
.add("nested_struct",
new StructType().add("nested_integer", INTEGER, false, icebergFieldMetadata(119))
.add("nested_string", StringType.STRING, true, icebergFieldMetadata(120)),
false, icebergFieldMetadata(118)), false, icebergFieldMetadata(117))
.add("map", new MapType(StringType.STRING, StringType.STRING, false),
false, icebergFieldMetadata(121))
.add("array", new ArrayType(LongType.LONG, false), true, icebergFieldMetadata(124))

assert(convertedSchema.equals(expectedDeltaSchema))
}

private def icebergFieldMetadata(id: Int): FieldMetadata = {
FieldMetadata.builder().putLong(ColumnMapping.COLUMN_MAPPING_ID_KEY, id).build()
}
}

0 comments on commit d04e988

Please sign in to comment.