Skip to content

Commit

Permalink
Add IT test on provider creation with glue metastore (#151)
Browse files Browse the repository at this point in the history
Co-authored-by: Antonio Murgia <[email protected]>
  • Loading branch information
matar993 and agilelab-tmnd1991 authored Jan 5, 2024
1 parent 751bc45 commit f01fbb4
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 91 deletions.
167 changes: 83 additions & 84 deletions client-spark/src/test/java/io/whitefox/api/client/ApiUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import io.whitefox.api.utils.ApiClient;
import io.whitefox.api.utils.ApiException;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
Expand All @@ -14,99 +13,99 @@
import java.util.function.Supplier;

public class ApiUtils {
/**
* Returns the result of the call of the first argument (f) unless it throws an ApiException with HTTP status code 409,
* in that case, returns the result of the call of the second argument (defaultValue).
* If defaultValue is not dynamic, you can use also {@link ApiUtils#recoverConflict recoverConflinct}.
*/
public static <T> T recoverConflictLazy(Supplier<T> f, Supplier<T> defaultValue) {
try {
return f.get();
} catch (ApiException e) {
if (e.getCode() == 409) {
return defaultValue.get();
} else {
throw e;
}
}
/**
* Returns the result of the call of the first argument (f) unless it throws an ApiException with HTTP status code 409,
* in that case, returns the result of the call of the second argument (defaultValue).
* If defaultValue is not dynamic, you can use also {@link ApiUtils#recoverConflict recoverConflinct}.
*/
public static <T> T recoverConflictLazy(Supplier<T> f, Supplier<T> defaultValue) {
try {
return f.get();
} catch (ApiException e) {
if (e.getCode() == 409) {
return defaultValue.get();
} else {
throw e;
}
}
}

/**
* Returns the result of the call of the first argument (f) unless it throws an ApiException with HTTP status code 409,
* in that case, returns the second argument (defaultValue).
* If defaultValue is dynamic, you can use also {@link ApiUtils#recoverConflictLazy recoverConflictLazy}.
*/
public static <T> T recoverConflict(Supplier<T> f, T defaultValue) {
return recoverConflictLazy(f, new Supplier<T>() {
@Override
public T get() {
return defaultValue;
}
});
}
/**
* Returns the result of the call of the first argument (f) unless it throws an ApiException with HTTP status code 409,
* in that case, returns the second argument (defaultValue).
* If defaultValue is dynamic, you can use also {@link ApiUtils#recoverConflictLazy recoverConflictLazy}.
*/
public static <T> T recoverConflict(Supplier<T> f, T defaultValue) {
return recoverConflictLazy(f, new Supplier<T>() {
@Override
public T get() {
return defaultValue;
}
});
}

/**
* Calls the first argument (f), if the call throws an ApiException with HTTP status code 409 will swallow the exception.
*/
public static <T> void ignoreConflict(Supplier<T> f) {
recoverConflict(f, null);
}
/**
* Calls the first argument (f), if the call throws an ApiException with HTTP status code 409 will swallow the exception.
*/
public static <T> void ignoreConflict(Supplier<T> f) {
recoverConflict(f, null);
}

private static final ObjectMapper objectMapper = new ObjectMapper();
public static final String ENDPOINT_FIELD_NAME = "endpoint";
public static final String BEARER_TOKEN_FIELD_NAME = "bearerToken";
private static final ObjectMapper objectMapper = new ObjectMapper();
public static final String ENDPOINT_FIELD_NAME = "endpoint";
public static final String BEARER_TOKEN_FIELD_NAME = "bearerToken";

/**
* Reads a resource named as the parameter, parses it following
* <a href="https://github.com/delta-io/delta-sharing/blob/main/PROTOCOL.md#profile-file-format">delta sharing specification</a>
* and configures an {@link ApiClient ApiClient} accordingly.
*/
public static ApiClient configureApiClientFromResource(String resourceName) {
try (InputStream is = ApiUtils.class.getClassLoader().getResourceAsStream(resourceName)) {
return configureClientInternal(objectMapper.reader().readTree(is));
} catch (IOException e) {
throw new RuntimeException(String.format("Cannot read %s", resourceName), e);
} catch (NullPointerException e) {
throw new RuntimeException(String.format("Cannot find resource %s", resourceName), e);
}
/**
* Reads a resource named as the parameter, parses it following
* <a href="https://github.com/delta-io/delta-sharing/blob/main/PROTOCOL.md#profile-file-format">delta sharing specification</a>
* and configures an {@link ApiClient ApiClient} accordingly.
*/
public static ApiClient configureApiClientFromResource(String resourceName) {
try (InputStream is = ApiUtils.class.getClassLoader().getResourceAsStream(resourceName)) {
return configureClientInternal(objectMapper.reader().readTree(is));
} catch (IOException e) {
throw new RuntimeException(String.format("Cannot read %s", resourceName), e);
} catch (NullPointerException e) {
throw new RuntimeException(String.format("Cannot find resource %s", resourceName), e);
}
}

/**
* Reads a local file named as the parameter, parses it following
* <a href="https://github.com/delta-io/delta-sharing/blob/main/PROTOCOL.md#profile-file-format">delta sharing specification</a>
* and configures an {@link ApiClient ApiClient} accordingly.
*/
public static ApiClient configureClientFromFile(File file) {
try (InputStream is = new FileInputStream(file)) {
return configureClientInternal(objectMapper.reader().readTree(is));
} catch (IOException e) {
throw new RuntimeException(String.format("Cannot read %s", file), e);
}
/**
* Reads a local file named as the parameter, parses it following
* <a href="https://github.com/delta-io/delta-sharing/blob/main/PROTOCOL.md#profile-file-format">delta sharing specification</a>
* and configures an {@link ApiClient ApiClient} accordingly.
*/
public static ApiClient configureClientFromFile(File file) {
try (InputStream is = new FileInputStream(file)) {
return configureClientInternal(objectMapper.reader().readTree(is));
} catch (IOException e) {
throw new RuntimeException(String.format("Cannot read %s", file), e);
}
}

private static ApiClient configureClientInternal(JsonNode conf) {
var endpointText = getRequiredField(conf, ENDPOINT_FIELD_NAME).asText();
var token = getRequiredField(conf, BEARER_TOKEN_FIELD_NAME).asText();
try {
var endpoint = new URI(endpointText);
var apiClient = new ApiClient();
apiClient.setHost(endpoint.getHost());
apiClient.setPort(endpoint.getPort());
apiClient.setScheme(endpoint.getScheme());
apiClient.setRequestInterceptor(
builder -> builder.header("Authorization", String.format("Bearer %s", token)));
return apiClient;
} catch (URISyntaxException u) {
throw new RuntimeException(String.format("Invalid endpoint syntax %s", endpointText), u);
}
private static ApiClient configureClientInternal(JsonNode conf) {
var endpointText = getRequiredField(conf, ENDPOINT_FIELD_NAME).asText();
var token = getRequiredField(conf, BEARER_TOKEN_FIELD_NAME).asText();
try {
var endpoint = new URI(endpointText);
var apiClient = new ApiClient();
apiClient.setHost(endpoint.getHost());
apiClient.setPort(endpoint.getPort());
apiClient.setScheme(endpoint.getScheme());
apiClient.setRequestInterceptor(
builder -> builder.header("Authorization", String.format("Bearer %s", token)));
return apiClient;
} catch (URISyntaxException u) {
throw new RuntimeException(String.format("Invalid endpoint syntax %s", endpointText), u);
}
}

private static JsonNode getRequiredField(JsonNode node, String fieldName) {
if (node.has(fieldName)) {
return node.get(fieldName);
} else {
throw new RuntimeException(
String.format("Cannot find required field %s in %s", fieldName, node));
}
private static JsonNode getRequiredField(JsonNode node, String fieldName) {
if (node.has(fieldName)) {
return node.get(fieldName);
} else {
throw new RuntimeException(
String.format("Cannot find required field %s in %s", fieldName, node));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import com.github.mrpowers.spark.fast.tests.DatasetComparer;
import io.whitefox.api.client.model.CreateMetastore;
import io.whitefox.api.client.model.Metastore;
import io.whitefox.api.client.model.Provider;
import io.whitefox.api.models.MrFoxDeltaTableSchema;
import io.whitefox.api.utils.SparkUtil;
import io.whitefox.api.utils.StorageManagerInitializer;
Expand Down Expand Up @@ -63,9 +63,10 @@ void showS3Table1withQueryTableApi() {
}

@Test
void createGlueMetastore() {
Metastore metastore = storageManagerInitializer.createGlueMetastore();
assertEquals(metastore.getName(), "MrFoxMetastore");
assertEquals(metastore.getType(), CreateMetastore.TypeEnum.GLUE.getValue());
void createProviderWithGlueMetastore() {
Provider provider = storageManagerInitializer.createProviderWithGlueMetastore();
assertEquals(provider.getStorage().getName(), "MrFoxStorage");
assertEquals(provider.getMetastore().getName(), "MrFoxMetastore");
assertEquals(provider.getMetastore().getType(), CreateMetastore.TypeEnum.GLUE.getValue());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,15 @@ public void createS3DeltaTable() {
addTableToSchemaRequest(providerRequest.getName(), createTableRequest.getName())));
}

public Metastore createGlueMetastore() {
public Provider createProviderWithGlueMetastore() {
var metastoreRequest = createMetastoreRequest(s3TestConfig, CreateMetastore.TypeEnum.GLUE);
return ApiUtils.recoverConflictLazy(
var metastore = ApiUtils.recoverConflictLazy(
() -> metastoreV1Api.createMetastore(metastoreRequest),
() -> metastoreV1Api.describeMetastore(metastoreRequest.getName()));
var providerRequest = addProviderRequest(Optional.of(metastore.getName()), TableFormat.iceberg);
return ApiUtils.recoverConflictLazy(
() -> providerV1Api.addProvider(providerRequest),
() -> providerV1Api.getProvider(providerRequest.getName()));
}

private String createSchemaRequest(TableFormat tableFormat) {
Expand Down

0 comments on commit f01fbb4

Please sign in to comment.