Skip to content

Commit

Permalink
feat(spark-lineage, java-emitter): Support ssl cert disable verificat…
Browse files Browse the repository at this point in the history
…ion functionality (#5488)

Co-authored-by: Shirshanka Das <[email protected]>
  • Loading branch information
MugdhaHardikar-GSLab and shirshanka authored Jul 27, 2022
1 parent 44122d4 commit d8b2350
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package datahub.client.rest;

import com.google.common.annotations.VisibleForTesting;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -37,6 +38,15 @@
import datahub.event.UpsertAspectRequest;
import lombok.extern.slf4j.Slf4j;

import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.conn.ssl.TrustAllStrategy;
import org.apache.http.nio.client.HttpAsyncClient;
import org.apache.http.ssl.SSLContextBuilder;

import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;


@ThreadSafe
@Slf4j
Expand Down Expand Up @@ -86,6 +96,17 @@ public RestEmitter(RestEmitterConfig config) {
.setSocketTimeout(config.getTimeoutSec() * 1000)
.build());
}
if (config.isDisableSslVerification()) {
HttpAsyncClientBuilder httpClientBuilder = this.config.getAsyncHttpClientBuilder();
try {
httpClientBuilder
.setSSLContext(new SSLContextBuilder().loadTrustMaterial(null, TrustAllStrategy.INSTANCE).build())
.setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE);
} catch (KeyManagementException | NoSuchAlgorithmException | KeyStoreException e) {
throw new RuntimeException("Error while creating insecure http client", e);
}
}

this.httpClient = this.config.getAsyncHttpClientBuilder().build();
this.httpClient.start();
this.ingestProposalUrl = this.config.getServer() + "/aspects?action=ingestProposal";
Expand Down Expand Up @@ -313,4 +334,10 @@ public void cancelled() {
Future<HttpResponse> requestFuture = httpClient.execute(httpPost, httpCallback);
return new MetadataResponseFuture(requestFuture, responseAtomicReference, responseLatch);
}

@VisibleForTesting
HttpAsyncClient getHttpClient() {
return this.httpClient;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ public class RestEmitterConfig {
private final String server = "http://localhost:8080";

private final Integer timeoutSec;

@Builder.Default
private final boolean disableSslVerification = false;

@Builder.Default
private final String token = DEFAULT_AUTH_TOKEN;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import javax.net.ssl.SSLHandshakeException;

import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
Expand Down Expand Up @@ -380,4 +384,30 @@ public void testUserAgentHeader() throws IOException, ExecutionException, Interr
request("/config")
.withHeader("User-Agent", "DataHub-RestClient/" + version));
}

@Test
public void testDisableSslVerification() throws IOException, InterruptedException, ExecutionException {
RestEmitter restEmitter = new RestEmitter(RestEmitterConfig.builder().disableSslVerification(true).build());
final String hostWithSsl = "https://self-signed.badssl.com";
final HttpGet request = new HttpGet(hostWithSsl);

final HttpResponse response = restEmitter.getHttpClient().execute(request, null).get();
restEmitter.close();
Assert.assertEquals(200, response.getStatusLine().getStatusCode());
}

@Test
public void testSslVerificationException() throws IOException, InterruptedException, ExecutionException {
RestEmitter restEmitter = new RestEmitter(RestEmitterConfig.builder().disableSslVerification(false).build());
final String hostWithSsl = "https://self-signed.badssl.com";
final HttpGet request = new HttpGet(hostWithSsl);
try {
HttpResponse response = restEmitter.getHttpClient().execute(request, null).get();
Assert.fail();
} catch (Exception e) {
Assert.assertTrue(e instanceof ExecutionException);
Assert.assertTrue(((ExecutionException) e).getCause() instanceof SSLHandshakeException);
}
restEmitter.close();
}
}
1 change: 1 addition & 0 deletions metadata-integration/java/spark-lineage/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ spark = SparkSession.builder()
| spark.extraListeners || | datahub.spark.DatahubSparkListener |
| spark.datahub.rest.server || | Datahub server url eg:<http://localhost:8080> |
| spark.datahub.rest.token | | | Authentication token. |
| spark.datahub.rest.disable_ssl_verification | | false | Disable SSL certificate validation. Caution: Only use this if you know what you are doing! |
| spark.datahub.metadata.pipeline.platformInstance| | | Pipeline level platform instance |
| spark.datahub.metadata.dataset.platformInstance| | | dataset level platform instance |
| spark.datahub.metadata.dataset.env | | PROD | [Supported values](https://datahubproject.io/docs/graphql/enums#fabrictype). In all other cases, will fallback to PROD |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class McpEmitter implements LineageConsumer {
private static final String TRANSPORT_KEY = "transport";
private static final String GMS_URL_KEY = "rest.server";
private static final String GMS_AUTH_TOKEN = "rest.token";

private static final String DISABLE_SSL_VERIFICATION_KEY = "rest.disable_ssl_verification";
private Optional<Emitter> getEmitter() {
Optional<Emitter> emitter = Optional.empty();
switch (emitterType) {
Expand Down Expand Up @@ -78,12 +78,19 @@ public McpEmitter(Config datahubConf) {
String gmsUrl = datahubConf.hasPath(GMS_URL_KEY) ? datahubConf.getString(GMS_URL_KEY)
: "http://localhost:8080";
String token = datahubConf.hasPath(GMS_AUTH_TOKEN) ? datahubConf.getString(GMS_AUTH_TOKEN) : null;
boolean disableSslVerification = datahubConf.hasPath(DISABLE_SSL_VERIFICATION_KEY) ? datahubConf.getBoolean(
DISABLE_SSL_VERIFICATION_KEY) : false;
log.info("REST Emitter Configuration: GMS url {}{}", gmsUrl,
(datahubConf.hasPath(GMS_URL_KEY) ? "" : "(default)"));
if (token != null) {
log.info("REST Emitter Configuration: Token {}", (token != null) ? "XXXXX" : "(empty)");
}
restEmitterConfig = Optional.of(RestEmitterConfig.builder().server(gmsUrl).token(token).build());
if (disableSslVerification) {
log.warn("REST Emitter Configuration: ssl verification will be disabled.");
}
restEmitterConfig = Optional.of(RestEmitterConfig.builder()
.server(gmsUrl).token(token)
.disableSslVerification(disableSslVerification).build());

break;
default:
Expand Down

0 comments on commit d8b2350

Please sign in to comment.