Skip to content

Commit

Permalink
Test the server with the delta sharing client (#129)
Browse files Browse the repository at this point in the history
Add spark-delta integration test: 
Add a new module "client-spark" that runs an integration test against a running whitefox server. The goal is to read a delta-table deployed in an s3 bucket querying the whitefox server with the official delta-sharing client. To run the integration test, start a server running java -jar server/app/build/quarkus-app/quarkus-run.jar,  then run ./gradlew spark-client:sparkTest
  • Loading branch information
matar993 authored Dec 1, 2023
1 parent 3f84ab7 commit 68377bf
Show file tree
Hide file tree
Showing 18 changed files with 355 additions and 49 deletions.
10 changes: 10 additions & 0 deletions .github/workflows/compile.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,16 @@ jobs:
./gradlew build testNative --no-daemon
./gradlew server:app:printVersion --no-daemon -q
shell: bash
- name: Run integration test
shell: bash
env:
WHITEFOX_TEST_AWS_REGION: ${{ vars.WHITEFOX_AWS_REGION }}
WHITEFOX_TEST_AWS_ACCESS_KEY_ID: ${{ secrets.WHITEFOX_AWS_ACCESS_KEY_ID }}
WHITEFOX_TEST_AWS_SECRET_ACCESS_KEY: ${{ secrets.WHITEFOX_AWS_SECRET_ACCESS_KEY }}
run: |
java -jar server/app/build/quarkus-app/quarkus-run.jar &
./gradlew :client-spark:clientSparkTest --no-daemon
kill -9 %1
- name: Build container image
if: runner.os == 'Linux'
run: |
Expand Down
86 changes: 86 additions & 0 deletions client-spark/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import org.openapitools.generator.gradle.plugin.tasks.GenerateTask

plugins {
java
id("com.diffplug.spotless")
id("whitefox.java-conventions")
}

repositories {
mavenCentral()
}

dependencies {
// OPENAPI
implementation("org.eclipse.microprofile.openapi:microprofile-openapi-api:3.1.1")
implementation("org.openapitools:jackson-databind-nullable:0.2.6")
testImplementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310")
testImplementation("jakarta.annotation:jakarta.annotation-api:2.1.1")

// DELTA
testImplementation("org.apache.hadoop:hadoop-common:3.3.6")
testImplementation("io.delta:delta-sharing-spark_2.12:1.0.2")

//SPARK
testImplementation("org.apache.spark:spark-core_2.12:3.3.2")
testImplementation("org.apache.spark:spark-sql_2.12:3.3.2")
testImplementation("com.github.mrpowers:spark-fast-tests_2.12:1.3.0")

//JUNIT
testImplementation("org.junit.jupiter:junit-jupiter:5.8.1")
}


tasks.getByName<Test>("test") {
useJUnitPlatform {
excludeTags.add("clientSparkTest")
}
}

tasks.withType<Test> {
environment = env.allVariables
systemProperty ("java.util.logging.manager", "java.util.logging.LogManager") //TODO modularize the whitefox-conventions plugin
}

tasks.register<Test>("clientSparkTest") {
useJUnitPlatform {
includeTags.add("clientSparkTest")
}
}

val openApiCodeGenDir = "generated/openapi"
val generatedCodeDirectory = generatedCodeDirectory(layout, openApiCodeGenDir)

val whiteFoxGenerate = tasks.register<GenerateTask>("openapiGenerateClientApi") {
dependsOn(tasks.spotlessApply)
generatorName.set("java")
inputSpec.set("$rootDir/protocol/whitefox-protocol-api.yml")
library.set("native")
outputDir.set(generatedCodeDirectory)
additionalProperties.set(mapOf(
"apiPackage" to "io.whitefox.api.client",
"invokerPackage" to "io.whitefox.api.utils",
"modelPackage" to "io.whitefox.api.client.model",
"dateLibrary" to "java8",
"sourceFolder" to "src/gen/java",
"openApiNullable" to "true",
"annotationLibrary" to "none",
"serializationLibrary" to "jackson",
"useJakartaEe" to "true",
"useRuntimeException" to "true"
))
}

sourceSets {
getByName("test") {
java {
srcDir("${generatedCodeDirectory(layout, openApiCodeGenDir)}/src/gen/java")
}
}
}

tasks.withType<JavaCompile> {
options.encoding = "UTF-8"
options.compilerArgs.add("-parameters")
dependsOn(whiteFoxGenerate)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package io.whitefox.api.client;

import static org.junit.jupiter.api.Assertions.assertEquals;

import com.github.mrpowers.spark.fast.tests.DatasetComparer;
import io.whitefox.api.models.MrFoxDeltaTableSchema;
import io.whitefox.api.utils.StorageManagerInitializer;
import java.util.List;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import scala.collection.GenMap;

@Tag("clientSparkTest")
public class ITDeltaSharingClient implements DatasetComparer {

private final String tablePath = String.format(
"%s#%s.%s.%s",
getClass().getClassLoader().getResource("MrFoxProfile.json"),
"s3share",
"s3schema",
"s3Table1");

private final SparkSession spark = SparkSession.builder()
.appName("delta sharing client test")
.config("spark.driver.host", "localhost")
.master("local[1, 4]")
.getOrCreate();

@BeforeAll
static void initStorageManager() {
new StorageManagerInitializer().initStorageManager();
}

@Test
void showS3Table1withQueryTableApi() {
var ds = spark.read().format("deltaSharing").load(tablePath);
var expectedSchema = new StructType(new StructField[] {
new StructField("id", DataType.fromDDL("long"), true, new Metadata(GenMap.empty()))
});
var expectedData = spark
.createDataFrame(
List.of(
new MrFoxDeltaTableSchema(0),
new MrFoxDeltaTableSchema(3),
new MrFoxDeltaTableSchema(2),
new MrFoxDeltaTableSchema(1),
new MrFoxDeltaTableSchema(4)),
MrFoxDeltaTableSchema.class)
.toDF();

assertEquals(expectedSchema.json(), ds.schema().json());
assertEquals(5, ds.count());
assertSmallDatasetEquality(ds, expectedData, true, false, false, 500);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.whitefox.api.models;

public class MrFoxDeltaTableSchema {
private final long id;

public MrFoxDeltaTableSchema(long id) {
this.id = id;
}

public long getId() {
return id;
}
}
32 changes: 32 additions & 0 deletions client-spark/src/test/java/io/whitefox/api/utils/S3TestConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package io.whitefox.api.utils;

public class S3TestConfig {
private final String region;
private final String accessKey;
private final String secretKey;

public String getRegion() {
return region;
}

public String getAccessKey() {
return accessKey;
}

public String getSecretKey() {
return secretKey;
}

public S3TestConfig(String region, String accessKey, String secretKey) {
this.region = region;
this.accessKey = accessKey;
this.secretKey = secretKey;
}

public static S3TestConfig loadFromEnv() {
return new S3TestConfig(
System.getenv().get("WHITEFOX_TEST_AWS_REGION"),
System.getenv().get("WHITEFOX_TEST_AWS_ACCESS_KEY_ID"),
System.getenv().get("WHITEFOX_TEST_AWS_SECRET_ACCESS_KEY"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package io.whitefox.api.utils;

import io.whitefox.api.client.*;
import io.whitefox.api.client.model.*;
import java.util.List;
import java.util.Map;

public class StorageManagerInitializer {
private final S3TestConfig s3TestConfig;
private final StorageV1Api storageV1Api;
private final ProviderV1Api providerV1Api;
private final TableV1Api tableV1Api;
private final ShareV1Api shareV1Api;
private final SchemaV1Api schemaV1Api;

public StorageManagerInitializer() {
var apiClient = new ApiClient();
this.s3TestConfig = S3TestConfig.loadFromEnv();
this.storageV1Api = new StorageV1Api(apiClient);
this.providerV1Api = new ProviderV1Api(apiClient);
this.tableV1Api = new TableV1Api(apiClient);
this.shareV1Api = new ShareV1Api(apiClient);
this.schemaV1Api = new SchemaV1Api(apiClient);
}

public void initStorageManager() {
storageV1Api.createStorage(createStorageRequest(s3TestConfig));
providerV1Api.addProvider(addProviderRequest());
tableV1Api.createTableInProvider(addProviderRequest().getName(), createTableRequest());
shareV1Api.createShare(createShareRequest());
schemaV1Api.createSchema(createShareRequest().getName(), createSchemaRequest());
schemaV1Api.addTableToSchema(
createShareRequest().getName(), createSchemaRequest(), addTableToSchemaRequest());
}

private String createSchemaRequest() {
return "s3schema";
}

private AddTableToSchemaRequest addTableToSchemaRequest() {
return new AddTableToSchemaRequest()
.name("s3Table1")
.reference(new TableReference().providerName("MrFoxProvider").name("s3Table1"));
}

private CreateShareInput createShareRequest() {
return new CreateShareInput().name("s3share").recipients(List.of("Mr.Fox")).schemas(List.of());
}

private CreateTableInput createTableRequest() {
return new CreateTableInput()
.name("s3Table1")
.skipValidation(true)
.properties(Map.of(
"type", "delta",
"location", "s3a://whitefox-s3-test-bucket/delta/samples/delta-table"));
}

private ProviderInput addProviderRequest() {
return new ProviderInput()
.name("MrFoxProvider")
.storageName("MrFoxStorage")
.metastoreName(null);
}

private CreateStorage createStorageRequest(S3TestConfig s3TestConfig) {
return new CreateStorage()
.name("MrFoxStorage")
.type(CreateStorage.TypeEnum.S3)
.properties(new StorageProperties(new S3Properties()
.credentials(new SimpleAwsCredentials()
.region(s3TestConfig.getRegion())
.awsAccessKeyId(s3TestConfig.getAccessKey())
.awsSecretAccessKey(s3TestConfig.getSecretKey()))))
.skipValidation(true);
}
}
6 changes: 6 additions & 0 deletions client-spark/src/test/resources/MrFoxProfile.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"shareCredentialsVersion": 1,
"endpoint": "http://localhost:8080/delta-api/v1/",
"bearerToken": "fakeToken",
"expirationTime": null
}
35 changes: 22 additions & 13 deletions protocol/delta-sharing-protocol-api.yml
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,12 @@ paths:
description: 'Starting Timestamp ISO8601 format, in the UTC timezone'
schema:
type: string
- in: header
name: delta-sharing-capabilities
required: false
description: 'Delta Sharing Capabilities'
schema:
type: string
requestBody:
required: true
content:
Expand Down Expand Up @@ -670,7 +676,7 @@ components:
items:
type: string
jsonPredicateHints:
type: object
type: string
description: |
query predicates on partition columns specified using a structured JSON format.
When it’s present, the server will try to use the predicates to filter table's
Expand All @@ -680,19 +686,20 @@ components:
If the server encounters any errors during predicate processing (for example, invalid
syntax or non existing columns), it will skip filtering and return all the files.
When it’s absent, the server will return all the files in the table.
properties:
op:
$ref: '#/components/schemas/Ops'
children:
type: string
name:
type: string
value:
type: string
valueType:
type: string
# properties:
# op:
# $ref: '#/components/schemas/Ops'
# children:
# type: string
# name:
# type: string
# value:
# type: string
# valueType:
# type: string
limitHint:
type: integer
format: int64
example: 1000
description: |
It’s a hint from the client to tell the server how many rows the
Expand All @@ -717,13 +724,15 @@ components:
timestamp. This is only supported on tables with history sharing enabled.
startingVersion:
type: integer
format: int64
example: 1000
description: |
an optional version number. If set, will return all data change files
since startingVersion, inclusive, including historical metadata if seen
in the delta log.
endingVersion:
type: integer
format: int64
example: 1000
description: |
an optional version number, only used if startingVersion is set. If set,
Expand Down Expand Up @@ -836,7 +845,7 @@ components:
MetadataObject:
type: object
properties:
metadata:
metaData:
type: object
properties:
id:
Expand Down
4 changes: 2 additions & 2 deletions server/app/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ val quarkusPlatformArtifactId: String by project
val quarkusPlatformVersion: String by project

// region dependencies

val hadoopVersion = "3.3.6"
dependencies {
// INTERNAL
implementation(project(":server:core"))
implementation(project(":server:persistence:memory"))

// QUARKUS
implementation(enforcedPlatform("${quarkusPlatformGroupId}:${quarkusPlatformArtifactId}:${quarkusPlatformVersion}"))
implementation("io.quarkus:quarkus-arc")
Expand Down
Loading

0 comments on commit 68377bf

Please sign in to comment.