Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kernel: Add a conversion utility to convert Iceberg schema to Delta schema #4075

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions LICENSE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -386,3 +386,13 @@ license, which is reproduced here as well:
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

--------------------------------------------------------------------------------

This product includes code from Apache Iceberg.

* io.delta.kernel.internal.util.IcebergTypeToStructType visitor implementation based on org.apache.iceberg.spark.TypeToSparkType

Copyright: 2018-2024 The Apache Software Foundation
Home page: https://iceberg.apache.org/
License: https://www.apache.org/licenses/LICENSE-2.0
Comment on lines +392 to +398
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to make sure this attribution is done correctly

3 changes: 3 additions & 0 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,8 @@ from the Apache Spark project (www.github.com/apache/spark)
Apache Spark
Copyright 2014 and onwards The Apache Software Foundation.

Apache Iceberg
Copyright 2017-2024 The Apache Software Foundation
Comment on lines +23 to +24
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, I think this change to the notice is correct, currently the kernel module doesn't have its own License/Notice so I added this to the root level.


This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,8 @@ lazy val kernelApi = (project in file("kernel/kernel-api"))
"com.fasterxml.jackson.core" % "jackson-core" % "2.13.5",
"com.fasterxml.jackson.core" % "jackson-annotations" % "2.13.5",
"com.fasterxml.jackson.datatype" % "jackson-datatype-jdk8" % "2.13.5",

"org.apache.iceberg" % "iceberg-api" % "1.6.1",
"org.apache.iceberg" % "iceberg-core" % "1.6.1",
Comment on lines +580 to +581
Copy link
Contributor Author

@amogh-jahagirdar amogh-jahagirdar Jan 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't use 1.7.1 at the moment because Delta only builds with JDK 8 at the moment as far as I can tell, with the exception of Spark master which is using JDk 17 (spark 4.0 requires JDK 11+)

"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"junit" % "junit" % "4.13.2" % "test",
"com.novocode" % "junit-interface" % "0.11" % "test",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Copyright (2023) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.internal.util;

import io.delta.kernel.types.ArrayType;
import io.delta.kernel.types.BinaryType;
import io.delta.kernel.types.BooleanType;
import io.delta.kernel.types.DataType;
import io.delta.kernel.types.DateType;
import io.delta.kernel.types.DecimalType;
import io.delta.kernel.types.DoubleType;
import io.delta.kernel.types.FieldMetadata;
import io.delta.kernel.types.FloatType;
import io.delta.kernel.types.IntegerType;
import io.delta.kernel.types.LongType;
import io.delta.kernel.types.MapType;
import io.delta.kernel.types.StringType;
import io.delta.kernel.types.StructField;
import io.delta.kernel.types.StructType;
import io.delta.kernel.types.TimestampNTZType;
import io.delta.kernel.types.TimestampType;
import java.util.List;
import org.apache.iceberg.Schema;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;

/**
* Schema visitor to convert Iceberg types to Delta kernel types Most code and behavior is taken
* from {@link org.apache.iceberg.spark.TypeToSparkType} with modifications for copying field IDs to
* FieldMetadata
*/
class IcebergTypeToStructType extends TypeUtil.SchemaVisitor<DataType> {
@Override
public DataType schema(Schema schema, DataType structType) {
return structType;
}

public DataType list(Types.ListType list, DataType elementResult) {
return new ArrayType(elementResult, list.isElementOptional());
}

@Override
public DataType map(Types.MapType map, DataType keyResult, DataType valueResult) {
return new MapType(keyResult, valueResult, map.isValueOptional());
}

@Override
public DataType struct(Types.StructType struct, List<DataType> fieldResults) {
List<Types.NestedField> fields = struct.fields();

List<StructField> structFields = Lists.newArrayListWithExpectedSize(fieldResults.size());
for (int i = 0; i < fields.size(); i++) {
Types.NestedField field = fields.get(i);
DataType type = fieldResults.get(i);
StructField structField =
new StructField(
field.name(),
type,
field.isOptional(),
FieldMetadata.builder()
.putLong(ColumnMapping.COLUMN_MAPPING_ID_KEY, field.fieldId())
.build());
// ToDo: No concept of field comments in Delta StructType?
structFields.add(structField);
}

return new StructType(structFields);
}

@Override
public DataType field(Types.NestedField field, DataType fieldResult) {
return fieldResult;
}

@Override
public DataType primitive(Type.PrimitiveType primitive) {
switch (primitive.typeId()) {
case BOOLEAN:
return BooleanType.BOOLEAN;
case INTEGER:
return IntegerType.INTEGER;
case LONG:
return LongType.LONG;
case FLOAT:
return FloatType.FLOAT;
case DOUBLE:
return DoubleType.DOUBLE;
case DATE:
return DateType.DATE;
case TIME:
throw new UnsupportedOperationException("Spark does not support time fields");
case TIMESTAMP:
Types.TimestampType ts = (Types.TimestampType) primitive;
if (ts.shouldAdjustToUTC()) {
return TimestampType.TIMESTAMP;
} else {
return TimestampNTZType.TIMESTAMP_NTZ;
}
case STRING:
return StringType.STRING;
case UUID:
// use String
return StringType.STRING;
case FIXED:
return BinaryType.BINARY;
case BINARY:
return BinaryType.BINARY;
case DECIMAL:
Types.DecimalType decimal = (Types.DecimalType) primitive;
return new DecimalType(decimal.precision(), decimal.scale());
default:
throw new UnsupportedOperationException(
"Cannot convert unknown type to Spark: " + primitive);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import io.delta.kernel.types.*;
import java.util.*;
import java.util.stream.Collectors;
import org.apache.iceberg.Schema;
import org.apache.iceberg.types.TypeUtil;

/**
* Utility methods for schema related operations such as validating the schema has no duplicate
Expand Down Expand Up @@ -77,6 +79,10 @@ public static void validateSchema(StructType schema, boolean isColumnMappingEnab
validateSupportedType(schema);
}

public static StructType icebergToDeltaSchema(Schema icebergSchema) {
return (StructType) TypeUtil.visit(icebergSchema, new IcebergTypeToStructType());
}

/**
* Verify the partition columns exists in the table schema and a supported data type for a
* partition column.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ package io.delta.kernel.internal.util
import io.delta.kernel.exceptions.KernelException
import io.delta.kernel.internal.util.SchemaUtils.validateSchema
import io.delta.kernel.types.IntegerType.INTEGER
import io.delta.kernel.types.{ArrayType, MapType, StringType, StructType}
import io.delta.kernel.types.{ArrayType, BinaryType, BooleanType, DateType, DecimalType, DoubleType, FieldMetadata, FloatType, LongType, MapType, StringType, StructType, TimestampNTZType, TimestampType}
import org.apache.iceberg.Schema
import org.apache.iceberg.types.Types
import org.apache.iceberg.types.Types.NestedField.{optional, required}
import org.scalatest.funsuite.AnyFunSuite

import java.util.Locale
Expand Down Expand Up @@ -273,4 +276,66 @@ class SchemaUtilsSuite extends AnyFunSuite {
}
}
}

///////////////////////////////////////////////////////////////////////////
// Iceberg Schema Conversion
///////////////////////////////////////////////////////////////////////////
test("check Iceberg schema conversion") {
val icebergSchema = new Schema(Types.StructType.of(
optional(1, "intCol", Types.IntegerType.get),
optional(3, "longCol", Types.LongType.get),
required(9, "doubleCol", Types.DoubleType.get),
required(10, "uuidCol", Types.UUIDType.get),
optional(2, "booleanCol", Types.BooleanType.get),
optional(21, "fixedCol", Types.FixedType.ofLength(4096)),
required(22, "binaryCol", Types.BinaryType.get),
required(23, "stringCol", Types.StringType.get),
required(25, "floatCol", Types.FloatType.get),
optional(30, "dateCol", Types.DateType.get),
required(34, "timestampCol", Types.TimestampType.withZone),
required(35, "timestampNtzCol", Types.TimestampType.withoutZone()),
required(114, "dec_9_0", Types.DecimalType.of(9, 0)),
required(115, "dec_11_2", Types.DecimalType.of(11, 2)),
required(116, "dec_38_10", Types.DecimalType.of(38, 10)),
required(117, "struct", Types.StructType.of(
Types.NestedField.required(118, "nested_struct", Types.StructType.of(
Types.NestedField.required(119, "nested_integer", Types.IntegerType.get()),
Types.NestedField.optional(120, "nested_string", Types.StringType.get()))))),
required(121, "map", Types.MapType.ofRequired(
122, 123, Types.StringType.get(), Types.StringType.get())),
optional(124, "array", Types.ListType.ofRequired(125, Types.LongType.get()))).fields())

val convertedSchema = SchemaUtils.icebergToDeltaSchema(icebergSchema)

val expectedDeltaSchema = new StructType()
.add("intCol", INTEGER, true, icebergFieldMetadata(1))
.add("longCol", LongType.LONG, true, icebergFieldMetadata(3))
.add("doubleCol", DoubleType.DOUBLE, false, icebergFieldMetadata(9))
.add("uuidCol", StringType.STRING, false, icebergFieldMetadata(10))
.add("booleanCol", BooleanType.BOOLEAN, true, icebergFieldMetadata(2))
.add("fixedCol", BinaryType.BINARY, true, icebergFieldMetadata(21))
.add("binaryCol", BinaryType.BINARY, false, icebergFieldMetadata(22))
.add("stringCol", StringType.STRING, false, icebergFieldMetadata(23))
.add("floatCol", FloatType.FLOAT, false, icebergFieldMetadata(25))
.add("dateCol", DateType.DATE, true, icebergFieldMetadata(30))
.add("timestampCol", TimestampType.TIMESTAMP, false, icebergFieldMetadata(34))
.add("timestampNtzCol", TimestampNTZType.TIMESTAMP_NTZ, false, icebergFieldMetadata(35))
.add("dec_9_0", new DecimalType(9, 0), false, icebergFieldMetadata(114))
.add("dec_11_2", new DecimalType(11, 2), false, icebergFieldMetadata(115))
.add("dec_38_10", new DecimalType(38, 10), false, icebergFieldMetadata(116))
.add("struct", new StructType()
.add("nested_struct",
new StructType().add("nested_integer", INTEGER, false, icebergFieldMetadata(119))
.add("nested_string", StringType.STRING, true, icebergFieldMetadata(120)),
false, icebergFieldMetadata(118)), false, icebergFieldMetadata(117))
.add("map", new MapType(StringType.STRING, StringType.STRING, false),
false, icebergFieldMetadata(121))
.add("array", new ArrayType(LongType.LONG, false), true, icebergFieldMetadata(124))

assert(convertedSchema.equals(expectedDeltaSchema))
}

private def icebergFieldMetadata(id: Int): FieldMetadata = {
FieldMetadata.builder().putLong(ColumnMapping.COLUMN_MAPPING_ID_KEY, id).build()
}
}
Loading