Skip to content

Commit

Permalink
support multi version
Browse files Browse the repository at this point in the history
  • Loading branch information
FANNG1 committed May 16, 2024
1 parent ffec87a commit 69b6a46
Show file tree
Hide file tree
Showing 68 changed files with 1,098 additions and 230 deletions.
4 changes: 2 additions & 2 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ tasks {
register("copySubprojectDependencies", Copy::class) {
subprojects.forEach() {
if (!it.name.startsWith("catalog") &&
!it.name.startsWith("client") && !it.name.startsWith("filesystem") && !it.name.startsWith("spark-connector") && it.name != "trino-connector" &&
!it.name.startsWith("client") && !it.name.startsWith("filesystem") && !it.name.startsWith("spark") && it.name != "trino-connector" &&
it.name != "integration-test" && it.name != "bundled-catalog"
) {
from(it.configurations.runtimeClasspath)
Expand All @@ -629,7 +629,7 @@ tasks {
if (!it.name.startsWith("catalog") &&
!it.name.startsWith("client") &&
!it.name.startsWith("filesystem") &&
!it.name.startsWith("spark-connector") &&
!it.name.startsWith("spark") &&
it.name != "trino-connector" &&
it.name != "integration-test" &&
it.name != "bundled-catalog"
Expand Down
4 changes: 2 additions & 2 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ caffeine = "2.9.3"
rocksdbjni = "7.10.2"
iceberg = '1.3.1' # 1.4.0 causes test to fail
trino = '426'
spark = "3.4.1" # 3.5.0 causes tests to fail
spark = "3.4.1"
scala-collection-compat = "2.7.0"
scala-java-compat = "1.0.2"
sqlite-jdbc = "3.42.0.0"
Expand All @@ -49,7 +49,7 @@ selenium = "3.141.59"
rauschig = "1.2.0"
mybatis = "3.5.6"
h2db = "1.4.200"
kyuubi = "1.8.2"
kyuubi = "1.7.4"
kafka = "3.4.0"
curator = "2.12.0"
awaitility = "4.2.1"
Expand Down
4 changes: 0 additions & 4 deletions integration-test/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@ dependencies {
testImplementation(project(":integration-test-common", "testArtifacts"))
testImplementation(project(":server"))
testImplementation(project(":server-common"))
testImplementation(project(":spark-connector:spark-connector")) {
exclude("org.apache.hadoop", "hadoop-client-api")
exclude("org.apache.hadoop", "hadoop-client-runtime")
}

testImplementation(libs.commons.cli)
testImplementation(libs.commons.lang3)
Expand Down
2 changes: 1 addition & 1 deletion settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ include(
"clients:client-python"
)
include("trino-connector")
include("spark-connector:spark-connector", "spark-connector:spark-connector-runtime")
include("spark-connector:spark-connector-common", "spark-connector:spark3.3", "spark-connector:spark3.4", "spark-connector:spark3.5")
include("web")
include("docs")
include("integration-test-common")
154 changes: 154 additions & 0 deletions spark-connector/spark-connector-common/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
plugins {
`maven-publish`
id("java")
id("idea")
alias(libs.plugins.shadow)
}

repositories {
mavenCentral()
}

val scalaVersion: String = project.properties["scalaVersion"] as? String ?: extra["defaultScalaVersion"].toString()
val sparkVersion: String = "3.3.4"
val sparkMajorVersion: String = sparkVersion.substringBeforeLast(".")
val icebergVersion: String = libs.versions.iceberg.get()
val kyuubiVersion: String = "1.7.4"
val scalaJava8CompatVersion: String = libs.versions.scala.java.compat.get()
val scalaCollectionCompatVersion: String = libs.versions.scala.collection.compat.get()

dependencies {
implementation(project(":catalogs:bundled-catalog", configuration = "shadow"))
implementation(project(":clients:client-java-runtime", configuration = "shadow"))
implementation(libs.guava)
compileOnly("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion")
compileOnly("org.apache.kyuubi:kyuubi-spark-connector-hive_$scalaVersion:$kyuubiVersion")

compileOnly("org.apache.spark:spark-catalyst_$scalaVersion:$sparkVersion")
compileOnly("org.apache.spark:spark-core_$scalaVersion:$sparkVersion")
compileOnly("org.apache.spark:spark-sql_$scalaVersion:$sparkVersion")
compileOnly("org.scala-lang.modules:scala-java8-compat_$scalaVersion:$scalaJava8CompatVersion")

annotationProcessor(libs.lombok)
compileOnly(libs.lombok)

testAnnotationProcessor(libs.lombok)
testCompileOnly(libs.lombok)

testImplementation(project(":integration-test-common", "testArtifacts"))
testImplementation(project(":core")) {
// use log from spark, to avoid java.lang.NoSuchMethodError: org.apache.logging.slf4j.Log4jLoggerFactory: method <init>()V not found
exclude("org.slf4j")
exclude("org.apache.logging.log4j")
}
testImplementation(project(":server")) {
exclude("org.slf4j")
exclude("org.apache.logging.log4j")
}
testImplementation(project(":server-common")) {
exclude("org.slf4j")
exclude("org.apache.logging.log4j")
}

testImplementation(libs.hive2.common) {
exclude("org.apache.curator")
// use hadoop from Spark
exclude("org.apache.hadoop")
exclude("org.eclipse.jetty.aggregate", "jetty-all")
exclude("org.eclipse.jetty.orbit", "javax.servlet")
}
testImplementation(libs.hive2.metastore) {
exclude("co.cask.tephra")
exclude("com.github.joshelser")
exclude("com.google.code.findbugs", "jsr305")
exclude("com.google.code.findbugs", "sr305")
exclude("com.tdunning", "json")
exclude("com.zaxxer", "HikariCP")
exclude("io.dropwizard.metricss")
exclude("javax.transaction", "transaction-api")
exclude("org.apache.avro")
exclude("org.apache.curator")
exclude("org.apache.hbase")
exclude("org.apache.hadoop")
exclude("org.apache.parquet", "parquet-hadoop-bundle")
exclude("org.apache.zookeeper")
exclude("org.eclipse.jetty.aggregate", "jetty-all")
exclude("org.eclipse.jetty.orbit", "javax.servlet")
exclude("org.slf4j")
}
testImplementation(libs.junit.jupiter.api)
testImplementation(libs.junit.jupiter.params)
testImplementation(libs.mysql.driver)
testImplementation(libs.testcontainers)

testImplementation("org.apache.iceberg:iceberg-core:$icebergVersion")
testImplementation("org.apache.iceberg:iceberg-hive-metastore:$icebergVersion")
testImplementation("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion")
testImplementation("org.apache.kyuubi:kyuubi-spark-connector-hive_$scalaVersion:$kyuubiVersion")
// include spark-sql,spark-catalyst,hive-common,hdfs-client
testImplementation("org.apache.spark:spark-hive_$scalaVersion:$sparkVersion") {
// conflict with Gravitino server jersey
exclude("org.glassfish.jersey.core")
exclude("org.glassfish.jersey.containers")
exclude("org.glassfish.jersey.inject")
}
testImplementation("org.scala-lang.modules:scala-collection-compat_$scalaVersion:$scalaCollectionCompatVersion")

testRuntimeOnly(libs.junit.jupiter.engine)
}

tasks.test {
val skipUTs = project.hasProperty("skipTests")
if (skipUTs) {
// Only run integration tests
include("**/integration/**")
}

val testMode = project.properties["testMode"] as? String ?: "embedded"

val skipITs = project.hasProperty("skipITs")
if (skipITs || testMode == "embedded") {
// Exclude integration tests
exclude("**/integration/**")
} else {
dependsOn(tasks.jar)

doFirst {
environment("GRAVITINO_CI_HIVE_DOCKER_IMAGE", "datastrato/gravitino-ci-hive:0.1.10")
}

val init = project.extra.get("initIntegrationTest") as (Test) -> Unit
init(this)
}
}

tasks.clean {
delete("spark-warehouse")
}

val testJar by tasks.registering(Jar::class) {
archiveClassifier.set("tests")
from(sourceSets["test"].output)
}

configurations {
create("testArtifacts")
}

artifacts {
add("testArtifacts", testJar)
}

tasks.register<Copy>("copy") {
from(configurations.testRuntimeClasspath)
into("build/libs-runtime")
}

tasks.register<Copy>("copy2") {
from(configurations.testCompileClasspath)
into("build/libs-compile")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.spark.connector;

import com.google.common.base.Preconditions;
import org.apache.spark.sql.connector.catalog.TableChange;

public class SparkTableChangeConverter {
private SparkTypeConverter sparkTypeConverter;

public SparkTableChangeConverter(SparkTypeConverter sparkTypeConverter) {
this.sparkTypeConverter = sparkTypeConverter;
}

public com.datastrato.gravitino.rel.TableChange toGravitinoTableChange(TableChange change) {
if (change instanceof TableChange.SetProperty) {
TableChange.SetProperty setProperty = (TableChange.SetProperty) change;
return com.datastrato.gravitino.rel.TableChange.setProperty(
setProperty.property(), setProperty.value());
} else if (change instanceof TableChange.RemoveProperty) {
TableChange.RemoveProperty removeProperty = (TableChange.RemoveProperty) change;
return com.datastrato.gravitino.rel.TableChange.removeProperty(removeProperty.property());
} else if (change instanceof TableChange.AddColumn) {
TableChange.AddColumn addColumn = (TableChange.AddColumn) change;
return com.datastrato.gravitino.rel.TableChange.addColumn(
addColumn.fieldNames(),
sparkTypeConverter.toGravitinoType(addColumn.dataType()),
addColumn.comment(),
transformColumnPosition(addColumn.position()),
addColumn.isNullable());
} else if (change instanceof TableChange.DeleteColumn) {
TableChange.DeleteColumn deleteColumn = (TableChange.DeleteColumn) change;
return com.datastrato.gravitino.rel.TableChange.deleteColumn(
deleteColumn.fieldNames(), deleteColumn.ifExists());
} else if (change instanceof TableChange.UpdateColumnType) {
TableChange.UpdateColumnType updateColumnType = (TableChange.UpdateColumnType) change;
return com.datastrato.gravitino.rel.TableChange.updateColumnType(
updateColumnType.fieldNames(),
sparkTypeConverter.toGravitinoType(updateColumnType.newDataType()));
} else if (change instanceof TableChange.RenameColumn) {
TableChange.RenameColumn renameColumn = (TableChange.RenameColumn) change;
return com.datastrato.gravitino.rel.TableChange.renameColumn(
renameColumn.fieldNames(), renameColumn.newName());
} else if (change instanceof TableChange.UpdateColumnPosition) {
TableChange.UpdateColumnPosition sparkUpdateColumnPosition =
(TableChange.UpdateColumnPosition) change;
com.datastrato.gravitino.rel.TableChange.UpdateColumnPosition gravitinoUpdateColumnPosition =
(com.datastrato.gravitino.rel.TableChange.UpdateColumnPosition)
com.datastrato.gravitino.rel.TableChange.updateColumnPosition(
sparkUpdateColumnPosition.fieldNames(),
transformColumnPosition(sparkUpdateColumnPosition.position()));
Preconditions.checkArgument(
!(gravitinoUpdateColumnPosition.getPosition()
instanceof com.datastrato.gravitino.rel.TableChange.Default),
"Doesn't support alter column position without specifying position");
return gravitinoUpdateColumnPosition;
} else if (change instanceof TableChange.UpdateColumnComment) {
TableChange.UpdateColumnComment updateColumnComment =
(TableChange.UpdateColumnComment) change;
return com.datastrato.gravitino.rel.TableChange.updateColumnComment(
updateColumnComment.fieldNames(), updateColumnComment.newComment());
} else if (change instanceof TableChange.UpdateColumnNullability) {
TableChange.UpdateColumnNullability updateColumnNullability =
(TableChange.UpdateColumnNullability) change;
return com.datastrato.gravitino.rel.TableChange.updateColumnNullability(
updateColumnNullability.fieldNames(), updateColumnNullability.nullable());
} else {
throw new UnsupportedOperationException(
String.format("Unsupported table change %s", change.getClass().getName()));
}
}

private com.datastrato.gravitino.rel.TableChange.ColumnPosition transformColumnPosition(
TableChange.ColumnPosition columnPosition) {
if (null == columnPosition) {
return com.datastrato.gravitino.rel.TableChange.ColumnPosition.defaultPos();
} else if (columnPosition instanceof TableChange.First) {
return com.datastrato.gravitino.rel.TableChange.ColumnPosition.first();
} else if (columnPosition instanceof TableChange.After) {
TableChange.After after = (TableChange.After) columnPosition;
return com.datastrato.gravitino.rel.TableChange.ColumnPosition.after(after.column());
} else {
throw new UnsupportedOperationException(
String.format(
"Unsupported table column position %s", columnPosition.getClass().getName()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,12 @@
import org.apache.spark.sql.types.StringType;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.TimestampNTZType;
import org.apache.spark.sql.types.TimestampType;
import org.apache.spark.sql.types.VarcharType;

/** Transform DataTypes between Gravitino and Spark. */
public class SparkTypeConverter {
public static Type toGravitinoType(DataType sparkType) {
public Type toGravitinoType(DataType sparkType) {
if (sparkType instanceof ByteType) {
return Types.ByteType.get();
} else if (sparkType instanceof ShortType) {
Expand Down Expand Up @@ -68,8 +67,6 @@ public static Type toGravitinoType(DataType sparkType) {
return Types.DateType.get();
} else if (sparkType instanceof TimestampType) {
return Types.TimestampType.withTimeZone();
} else if (sparkType instanceof TimestampNTZType) {
return Types.TimestampType.withoutTimeZone();
} else if (sparkType instanceof ArrayType) {
ArrayType arrayType = (ArrayType) sparkType;
return Types.ListType.of(toGravitinoType(arrayType.elementType()), arrayType.containsNull());
Expand Down Expand Up @@ -98,7 +95,7 @@ public static Type toGravitinoType(DataType sparkType) {
throw new UnsupportedOperationException("Not support " + sparkType.toString());
}

public static DataType toSparkType(Type gravitinoType) {
public DataType toSparkType(Type gravitinoType) {
if (gravitinoType instanceof Types.ByteType) {
return DataTypes.ByteType;
} else if (gravitinoType instanceof Types.ShortType) {
Expand Down Expand Up @@ -131,9 +128,6 @@ public static DataType toSparkType(Type gravitinoType) {
} else if (gravitinoType instanceof Types.TimestampType
&& ((Types.TimestampType) gravitinoType).hasTimeZone()) {
return DataTypes.TimestampType;
} else if (gravitinoType instanceof Types.TimestampType
&& !((Types.TimestampType) gravitinoType).hasTimeZone()) {
return DataTypes.TimestampNTZType;
} else if (gravitinoType instanceof Types.ListType) {
Types.ListType listType = (Types.ListType) gravitinoType;
return DataTypes.createArrayType(
Expand Down
Loading

0 comments on commit 69b6a46

Please sign in to comment.