From 28dfd354ef2a0bf5dd00242116d666570560cc18 Mon Sep 17 00:00:00 2001 From: deepgarg-visa <149145061+deepgarg-visa@users.noreply.github.com> Date: Mon, 22 Jul 2024 13:37:37 +0530 Subject: [PATCH] fix(spark-lineage): default timeout for future responses (#10947) --- .../main/java/datahub/client/MetadataResponseFuture.java | 2 +- .../src/main/java/datahub/client/rest/RestEmitter.java | 4 ++++ .../src/main/java/datahub/spark/DatahubEventEmitter.java | 7 +++++-- 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/metadata-integration/java/datahub-client/src/main/java/datahub/client/MetadataResponseFuture.java b/metadata-integration/java/datahub-client/src/main/java/datahub/client/MetadataResponseFuture.java index 11be10186f1ef3..1734b594064ccb 100644 --- a/metadata-integration/java/datahub-client/src/main/java/datahub/client/MetadataResponseFuture.java +++ b/metadata-integration/java/datahub-client/src/main/java/datahub/client/MetadataResponseFuture.java @@ -69,7 +69,7 @@ public MetadataWriteResponse get(long timeout, TimeUnit unit) return mapper.map(response); } else { // We wait for the callback to fill this out - responseLatch.await(); + responseLatch.await(timeout, unit); return responseReference.get(); } } diff --git a/metadata-integration/java/datahub-client/src/main/java/datahub/client/rest/RestEmitter.java b/metadata-integration/java/datahub-client/src/main/java/datahub/client/rest/RestEmitter.java index dd6a7ba98c87df..e1017372be124b 100644 --- a/metadata-integration/java/datahub-client/src/main/java/datahub/client/rest/RestEmitter.java +++ b/metadata-integration/java/datahub-client/src/main/java/datahub/client/rest/RestEmitter.java @@ -250,6 +250,7 @@ public void completed(SimpleHttpResponse response) { @Override public void failed(Exception ex) { + responseLatch.countDown(); if (callback != null) { try { callback.onFailure(ex); @@ -261,6 +262,7 @@ public void failed(Exception ex) { @Override public void cancelled() { + responseLatch.countDown(); if (callback != null) { try { callback.onFailure(new RuntimeException("Cancelled")); @@ -344,6 +346,7 @@ public void completed(SimpleHttpResponse response) { @Override public void failed(Exception ex) { + responseLatch.countDown(); if (callback != null) { try { callback.onFailure(ex); @@ -355,6 +358,7 @@ public void failed(Exception ex) { @Override public void cancelled() { + responseLatch.countDown(); if (callback != null) { try { callback.onFailure(new RuntimeException("Cancelled")); diff --git a/metadata-integration/java/spark-lineage-beta/src/main/java/datahub/spark/DatahubEventEmitter.java b/metadata-integration/java/spark-lineage-beta/src/main/java/datahub/spark/DatahubEventEmitter.java index 5a3f4bd27b4157..dc274ad7df3b4e 100644 --- a/metadata-integration/java/spark-lineage-beta/src/main/java/datahub/spark/DatahubEventEmitter.java +++ b/metadata-integration/java/spark-lineage-beta/src/main/java/datahub/spark/DatahubEventEmitter.java @@ -39,6 +39,8 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; @@ -53,6 +55,7 @@ public class DatahubEventEmitter extends EventEmitter { private final List _datahubJobs = new LinkedList<>(); private final Map schemaMap = new HashMap<>(); private SparkLineageConf datahubConf; + private static final int DEFAULT_TIMEOUT_SEC = 10; private final EventFormatter eventFormatter = new EventFormatter(); @@ -386,8 +389,8 @@ protected void emitMcps(List mcps) { .forEach( future -> { try { - log.info(future.get().toString()); - } catch (InterruptedException | ExecutionException e) { + log.info(future.get(DEFAULT_TIMEOUT_SEC, TimeUnit.SECONDS).toString()); + } catch (InterruptedException | ExecutionException | TimeoutException e) { // log error, but don't impact thread log.error("Failed to emit metadata to DataHub", e); }