diff --git a/datahub-frontend/app/auth/AuthModule.java b/datahub-frontend/app/auth/AuthModule.java
index 7db8f5689ead5..32dfba00d47db 100644
--- a/datahub-frontend/app/auth/AuthModule.java
+++ b/datahub-frontend/app/auth/AuthModule.java
@@ -63,6 +63,7 @@ public class AuthModule extends AbstractModule {
private static final String ENTITY_CLIENT_RETRY_INTERVAL = "entityClient.retryInterval";
private static final String ENTITY_CLIENT_NUM_RETRIES = "entityClient.numRetries";
private static final String ENTITY_CLIENT_RESTLI_GET_BATCH_SIZE = "entityClient.restli.get.batchSize";
+ private static final String ENTITY_CLIENT_RESTLI_GET_BATCH_CONCURRENCY = "entityClient.restli.get.batchConcurrency";
private static final String GET_SSO_SETTINGS_ENDPOINT = "auth/getSsoSettings";
private final com.typesafe.config.Config _configs;
@@ -208,7 +209,8 @@ protected SystemEntityClient provideEntityClient(
new ExponentialBackoff(_configs.getInt(ENTITY_CLIENT_RETRY_INTERVAL)),
_configs.getInt(ENTITY_CLIENT_NUM_RETRIES),
configurationProvider.getCache().getClient().getEntityClient(),
- Math.max(1, _configs.getInt(ENTITY_CLIENT_RESTLI_GET_BATCH_SIZE)));
+ Math.max(1, _configs.getInt(ENTITY_CLIENT_RESTLI_GET_BATCH_SIZE)),
+ Math.max(1, _configs.getInt(ENTITY_CLIENT_RESTLI_GET_BATCH_CONCURRENCY)));
}
@Provides
diff --git a/datahub-frontend/conf/application.conf b/datahub-frontend/conf/application.conf
index 6aa58d5b13b2c..045175ba69f02 100644
--- a/datahub-frontend/conf/application.conf
+++ b/datahub-frontend/conf/application.conf
@@ -289,5 +289,7 @@ entityClient.retryInterval = 2
entityClient.retryInterval = ${?ENTITY_CLIENT_RETRY_INTERVAL}
entityClient.numRetries = 3
entityClient.numRetries = ${?ENTITY_CLIENT_NUM_RETRIES}
-entityClient.restli.get.batchSize = 100
-entityClient.restli.get.batchSize = ${?ENTITY_CLIENT_RESTLI_GET_BATCH_SIZE}
\ No newline at end of file
+entityClient.restli.get.batchSize = 50
+entityClient.restli.get.batchSize = ${?ENTITY_CLIENT_RESTLI_GET_BATCH_SIZE}
+entityClient.restli.get.batchConcurrency = 2
+entityClient.restli.get.batchConcurrency = ${?ENTITY_CLIENT_RESTLI_GET_BATCH_CONCURRENCY}
\ No newline at end of file
diff --git a/metadata-jobs/mce-consumer-job/src/test/java/com/linkedin/metadata/kafka/MceConsumerApplicationTestConfiguration.java b/metadata-jobs/mce-consumer-job/src/test/java/com/linkedin/metadata/kafka/MceConsumerApplicationTestConfiguration.java
index 08ff802c37e40..ba650c25a6117 100644
--- a/metadata-jobs/mce-consumer-job/src/test/java/com/linkedin/metadata/kafka/MceConsumerApplicationTestConfiguration.java
+++ b/metadata-jobs/mce-consumer-job/src/test/java/com/linkedin/metadata/kafka/MceConsumerApplicationTestConfiguration.java
@@ -47,7 +47,8 @@ public SystemEntityClient systemEntityClient(
new ExponentialBackoff(1),
1,
configurationProvider.getCache().getClient().getEntityClient(),
- 1);
+ 1,
+ 2);
}
@MockBean public Database ebeanServer;
diff --git a/metadata-service/configuration/src/main/resources/application.yaml b/metadata-service/configuration/src/main/resources/application.yaml
index 19621dce767c6..4d188bd5c6183 100644
--- a/metadata-service/configuration/src/main/resources/application.yaml
+++ b/metadata-service/configuration/src/main/resources/application.yaml
@@ -386,6 +386,7 @@ entityClient:
restli:
get:
batchSize: ${ENTITY_CLIENT_RESTLI_GET_BATCH_SIZE:100} # limited to prevent exceeding restli URI size limit
+ batchConcurrency: ${ENTITY_CLIENT_RESTLI_GET_BATCH_CONCURRENCY:2} # parallel threads
usageClient:
retryInterval: ${USAGE_CLIENT_RETRY_INTERVAL:2}
diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entityclient/RestliEntityClientFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entityclient/RestliEntityClientFactory.java
index 2d9f570e1b07d..9e7255bf43a34 100644
--- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entityclient/RestliEntityClientFactory.java
+++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entityclient/RestliEntityClientFactory.java
@@ -30,7 +30,8 @@ public EntityClient entityClient(
@Value("${datahub.gms.sslContext.protocol}") String gmsSslProtocol,
@Value("${entityClient.retryInterval:2}") int retryInterval,
@Value("${entityClient.numRetries:3}") int numRetries,
- final @Value("${entityClient.restli.get.batchSize:150}") int batchGetV2Size) {
+ final @Value("${entityClient.restli.get.batchSize}") int batchGetV2Size,
+ final @Value("${entityClient.restli.get.batchConcurrency}") int batchGetV2Concurrency) {
final Client restClient;
if (gmsUri != null) {
restClient = DefaultRestliClientFactory.getRestLiClient(URI.create(gmsUri), gmsSslProtocol);
@@ -39,7 +40,11 @@ public EntityClient entityClient(
DefaultRestliClientFactory.getRestLiClient(gmsHost, gmsPort, gmsUseSSL, gmsSslProtocol);
}
return new RestliEntityClient(
- restClient, new ExponentialBackoff(retryInterval), numRetries, batchGetV2Size);
+ restClient,
+ new ExponentialBackoff(retryInterval),
+ numRetries,
+ batchGetV2Size,
+ batchGetV2Concurrency);
}
@Bean("systemEntityClient")
@@ -53,7 +58,8 @@ public SystemEntityClient systemEntityClient(
@Value("${entityClient.retryInterval:2}") int retryInterval,
@Value("${entityClient.numRetries:3}") int numRetries,
final EntityClientCacheConfig entityClientCacheConfig,
- final @Value("${entityClient.restli.get.batchSize:150}") int batchGetV2Size) {
+ final @Value("${entityClient.restli.get.batchSize}") int batchGetV2Size,
+ final @Value("${entityClient.restli.get.batchConcurrency}") int batchGetV2Concurrency) {
final Client restClient;
if (gmsUri != null) {
@@ -67,6 +73,7 @@ public SystemEntityClient systemEntityClient(
new ExponentialBackoff(retryInterval),
numRetries,
entityClientCacheConfig,
- batchGetV2Size);
+ batchGetV2Size,
+ batchGetV2Concurrency);
}
}
diff --git a/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/RestliEntityClient.java b/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/RestliEntityClient.java
index 70fae208ad77a..fe1ca571efea5 100644
--- a/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/RestliEntityClient.java
+++ b/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/RestliEntityClient.java
@@ -85,8 +85,13 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.mail.MethodNotSupportedException;
@@ -110,14 +115,17 @@ public class RestliEntityClient extends BaseClient implements EntityClient {
private static final RunsRequestBuilders RUNS_REQUEST_BUILDERS = new RunsRequestBuilders();
private final int batchGetV2Size;
+ private final int batchGetV2Concurrency;
public RestliEntityClient(
@Nonnull final Client restliClient,
@Nonnull final BackoffPolicy backoffPolicy,
int retryCount,
- int batchGetV2Size) {
+ int batchGetV2Size,
+ int batchGetV2Concurrency) {
super(restliClient, backoffPolicy, retryCount);
this.batchGetV2Size = Math.max(1, batchGetV2Size);
+ this.batchGetV2Concurrency = batchGetV2Concurrency;
}
@Override
@@ -150,7 +158,6 @@ public Entity get(@Nonnull OperationContext opContext, @Nonnull final Urn urn)
*
Batch get a set of {@link Entity} objects by urn.
*
* @param urns the urns of the entities to batch get
- * @param authentication the authentication to include in the request to the Metadata Service
* @throws RemoteInvocationException when unable to execute request
*/
@Override
@@ -216,40 +223,54 @@ public Map batchGetV2(
throws RemoteInvocationException, URISyntaxException {
Map responseMap = new HashMap<>();
+ ExecutorService executor = Executors.newFixedThreadPool(Math.max(1, batchGetV2Concurrency));
- Iterators.partition(urns.iterator(), batchGetV2Size)
- .forEachRemaining(
- batch -> {
- try {
- final EntitiesV2BatchGetRequestBuilder requestBuilder =
- ENTITIES_V2_REQUEST_BUILDERS
- .batchGet()
- .aspectsParam(aspectNames)
- .ids(batch.stream().map(Urn::toString).collect(Collectors.toList()));
-
- responseMap.putAll(
- sendClientRequest(requestBuilder, opContext.getSessionAuthentication())
- .getEntity()
- .getResults()
- .entrySet()
- .stream()
- .collect(
- Collectors.toMap(
- entry -> {
- try {
- return Urn.createFromString(entry.getKey());
- } catch (URISyntaxException e) {
- throw new RuntimeException(
- String.format(
- "Failed to bind urn string with value %s into urn",
- entry.getKey()));
- }
- },
- entry -> entry.getValue().getEntity())));
- } catch (RemoteInvocationException e) {
- throw new RuntimeException(e);
- }
- });
+ try {
+ Iterable> iterable = () -> Iterators.partition(urns.iterator(), batchGetV2Size);
+ List>> futures =
+ StreamSupport.stream(iterable.spliterator(), false)
+ .map(
+ batch ->
+ executor.submit(
+ () -> {
+ try {
+ log.debug("Executing batchGetV2 with batch size: {}", batch.size());
+ final EntitiesV2BatchGetRequestBuilder requestBuilder =
+ ENTITIES_V2_REQUEST_BUILDERS
+ .batchGet()
+ .aspectsParam(aspectNames)
+ .ids(
+ batch.stream()
+ .map(Urn::toString)
+ .collect(Collectors.toList()));
+
+ return sendClientRequest(
+ requestBuilder, opContext.getSessionAuthentication())
+ .getEntity()
+ .getResults()
+ .entrySet()
+ .stream()
+ .collect(
+ Collectors.toMap(
+ entry -> UrnUtils.getUrn(entry.getKey()),
+ entry -> entry.getValue().getEntity()));
+ } catch (RemoteInvocationException e) {
+ throw new RuntimeException(e);
+ }
+ }))
+ .collect(Collectors.toList());
+
+ futures.forEach(
+ result -> {
+ try {
+ responseMap.putAll(result.get());
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ } finally {
+ executor.shutdown();
+ }
return responseMap;
}
@@ -260,7 +281,6 @@ public Map batchGetV2(
* @param entityName the entity type to fetch
* @param versionedUrns the urns of the entities to batch get
* @param aspectNames the aspect names to batch get
- * @param authentication the authentication to include in the request to the Metadata Service
* @throws RemoteInvocationException when unable to execute request
*/
@Override
@@ -272,39 +292,62 @@ public Map batchGetVersionedV2(
@Nullable final Set aspectNames) {
Map responseMap = new HashMap<>();
+ ExecutorService executor = Executors.newFixedThreadPool(Math.max(1, batchGetV2Concurrency));
- Iterators.partition(versionedUrns.iterator(), batchGetV2Size)
- .forEachRemaining(
- batch -> {
- final EntitiesVersionedV2BatchGetRequestBuilder requestBuilder =
- ENTITIES_VERSIONED_V2_REQUEST_BUILDERS
- .batchGet()
- .aspectsParam(aspectNames)
- .entityTypeParam(entityName)
- .ids(
- batch.stream()
- .map(
- versionedUrn ->
- com.linkedin.common.urn.VersionedUrn.of(
- versionedUrn.getUrn().toString(),
- versionedUrn.getVersionStamp()))
- .collect(Collectors.toSet()));
-
- try {
- responseMap.putAll(
- sendClientRequest(requestBuilder, opContext.getSessionAuthentication())
- .getEntity()
- .getResults()
- .entrySet()
- .stream()
- .collect(
- Collectors.toMap(
- entry -> UrnUtils.getUrn(entry.getKey().getUrn()),
- entry -> entry.getValue().getEntity())));
- } catch (RemoteInvocationException e) {
- throw new RuntimeException(e);
- }
- });
+ try {
+ Iterable> iterable =
+ () -> Iterators.partition(versionedUrns.iterator(), batchGetV2Size);
+ List>> futures =
+ StreamSupport.stream(iterable.spliterator(), false)
+ .map(
+ batch ->
+ executor.submit(
+ () -> {
+ try {
+ log.debug(
+ "Executing batchGetVersionedV2 with batch size: {}",
+ batch.size());
+ final EntitiesVersionedV2BatchGetRequestBuilder requestBuilder =
+ ENTITIES_VERSIONED_V2_REQUEST_BUILDERS
+ .batchGet()
+ .aspectsParam(aspectNames)
+ .entityTypeParam(entityName)
+ .ids(
+ batch.stream()
+ .map(
+ versionedUrn ->
+ com.linkedin.common.urn.VersionedUrn.of(
+ versionedUrn.getUrn().toString(),
+ versionedUrn.getVersionStamp()))
+ .collect(Collectors.toSet()));
+
+ return sendClientRequest(
+ requestBuilder, opContext.getSessionAuthentication())
+ .getEntity()
+ .getResults()
+ .entrySet()
+ .stream()
+ .collect(
+ Collectors.toMap(
+ entry -> UrnUtils.getUrn(entry.getKey().getUrn()),
+ entry -> entry.getValue().getEntity()));
+ } catch (RemoteInvocationException e) {
+ throw new RuntimeException(e);
+ }
+ }))
+ .collect(Collectors.toList());
+
+ futures.forEach(
+ result -> {
+ try {
+ responseMap.putAll(result.get());
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ } finally {
+ executor.shutdown();
+ }
return responseMap;
}
@@ -955,7 +998,6 @@ public VersionedAspect getAspectOrNull(
* @param startTimeMillis the earliest desired event time of the aspect value in milliseconds.
* @param endTimeMillis the latest desired event time of the aspect value in milliseconds.
* @param limit the maximum number of desired aspect values.
- * @param authentication the actor associated with the request [internal]
* @return the list of EnvelopedAspect values satisfying the input parameters.
* @throws RemoteInvocationException on remote request error.
*/
diff --git a/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/SystemRestliEntityClient.java b/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/SystemRestliEntityClient.java
index 364ee9b0519d2..7546d1f0a3b54 100644
--- a/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/SystemRestliEntityClient.java
+++ b/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/SystemRestliEntityClient.java
@@ -27,8 +27,9 @@ public SystemRestliEntityClient(
@Nonnull final BackoffPolicy backoffPolicy,
int retryCount,
EntityClientCacheConfig cacheConfig,
- int batchGetV2Size) {
- super(restliClient, backoffPolicy, retryCount, batchGetV2Size);
+ int batchGetV2Size,
+ int batchGetV2Concurrency) {
+ super(restliClient, backoffPolicy, retryCount, batchGetV2Size, batchGetV2Concurrency);
this.operationContextMap = CacheBuilder.newBuilder().maximumSize(500).build();
this.entityClientCache = buildEntityClientCache(SystemRestliEntityClient.class, cacheConfig);
}
diff --git a/metadata-service/restli-client/src/test/java/com/linkedin/common/client/BaseClientTest.java b/metadata-service/restli-client/src/test/java/com/linkedin/common/client/BaseClientTest.java
index 474bb24f9e16b..797ead10c1a66 100644
--- a/metadata-service/restli-client/src/test/java/com/linkedin/common/client/BaseClientTest.java
+++ b/metadata-service/restli-client/src/test/java/com/linkedin/common/client/BaseClientTest.java
@@ -37,7 +37,7 @@ public void testZeroRetry() throws RemoteInvocationException {
when(mockRestliClient.sendRequest(any(ActionRequest.class))).thenReturn(mockFuture);
RestliEntityClient testClient =
- new RestliEntityClient(mockRestliClient, new ExponentialBackoff(1), 0, 10);
+ new RestliEntityClient(mockRestliClient, new ExponentialBackoff(1), 0, 10, 2);
testClient.sendClientRequest(testRequestBuilder, AUTH);
// Expected 1 actual try and 0 retries
verify(mockRestliClient).sendRequest(any(ActionRequest.class));
@@ -56,7 +56,7 @@ public void testMultipleRetries() throws RemoteInvocationException {
.thenReturn(mockFuture);
RestliEntityClient testClient =
- new RestliEntityClient(mockRestliClient, new ExponentialBackoff(1), 1, 10);
+ new RestliEntityClient(mockRestliClient, new ExponentialBackoff(1), 1, 10, 2);
testClient.sendClientRequest(testRequestBuilder, AUTH);
// Expected 1 actual try and 1 retries
verify(mockRestliClient, times(2)).sendRequest(any(ActionRequest.class));
@@ -73,7 +73,7 @@ public void testNonRetry() {
.thenThrow(new RuntimeException(new RequiredFieldNotPresentException("value")));
RestliEntityClient testClient =
- new RestliEntityClient(mockRestliClient, new ExponentialBackoff(1), 1, 10);
+ new RestliEntityClient(mockRestliClient, new ExponentialBackoff(1), 1, 10, 2);
assertThrows(
RuntimeException.class, () -> testClient.sendClientRequest(testRequestBuilder, AUTH));
}
diff --git a/metadata-service/restli-client/src/test/java/com/linkedin/entity/client/SystemRestliEntityClientTest.java b/metadata-service/restli-client/src/test/java/com/linkedin/entity/client/SystemRestliEntityClientTest.java
index 75614ca998f6a..e6d53fc98e2e3 100644
--- a/metadata-service/restli-client/src/test/java/com/linkedin/entity/client/SystemRestliEntityClientTest.java
+++ b/metadata-service/restli-client/src/test/java/com/linkedin/entity/client/SystemRestliEntityClientTest.java
@@ -45,7 +45,8 @@ public void testCache() throws RemoteInvocationException, URISyntaxException {
noCacheConfig.setEnabled(true);
SystemRestliEntityClient noCacheTest =
- new SystemRestliEntityClient(mockRestliClient, new ConstantBackoff(0), 0, noCacheConfig, 1);
+ new SystemRestliEntityClient(
+ mockRestliClient, new ConstantBackoff(0), 0, noCacheConfig, 1, 2);
com.linkedin.entity.EntityResponse responseStatusTrue = buildStatusResponse(true);
com.linkedin.entity.EntityResponse responseStatusFalse = buildStatusResponse(false);
@@ -83,7 +84,8 @@ public void testCache() throws RemoteInvocationException, URISyntaxException {
Map.of(TEST_URN.getEntityType(), Map.of(Constants.STATUS_ASPECT_NAME, 60)));
SystemRestliEntityClient cacheTest =
- new SystemRestliEntityClient(mockRestliClient, new ConstantBackoff(0), 0, cacheConfig, 1);
+ new SystemRestliEntityClient(
+ mockRestliClient, new ConstantBackoff(0), 0, cacheConfig, 1, 2);
mockResponse(mockRestliClient, responseStatusTrue);
assertEquals(
@@ -117,7 +119,8 @@ public void testBatchCache() throws RemoteInvocationException, URISyntaxExceptio
noCacheConfig.setEnabled(true);
SystemRestliEntityClient noCacheTest =
- new SystemRestliEntityClient(mockRestliClient, new ConstantBackoff(0), 0, noCacheConfig, 1);
+ new SystemRestliEntityClient(
+ mockRestliClient, new ConstantBackoff(0), 0, noCacheConfig, 1, 2);
com.linkedin.entity.EntityResponse responseStatusTrue = buildStatusResponse(true);
com.linkedin.entity.EntityResponse responseStatusFalse = buildStatusResponse(false);
@@ -155,7 +158,8 @@ public void testBatchCache() throws RemoteInvocationException, URISyntaxExceptio
Map.of(TEST_URN.getEntityType(), Map.of(Constants.STATUS_ASPECT_NAME, 60)));
SystemRestliEntityClient cacheTest =
- new SystemRestliEntityClient(mockRestliClient, new ConstantBackoff(0), 0, cacheConfig, 1);
+ new SystemRestliEntityClient(
+ mockRestliClient, new ConstantBackoff(0), 0, cacheConfig, 1, 2);
mockResponse(mockRestliClient, responseStatusTrue);
assertEquals(
diff --git a/metadata-service/services/src/main/java/com/linkedin/metadata/service/BaseService.java b/metadata-service/services/src/main/java/com/linkedin/metadata/service/BaseService.java
index 3f9022b634c67..dc533e4aa5de5 100644
--- a/metadata-service/services/src/main/java/com/linkedin/metadata/service/BaseService.java
+++ b/metadata-service/services/src/main/java/com/linkedin/metadata/service/BaseService.java
@@ -19,6 +19,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;
@@ -61,8 +62,9 @@ protected Map getTagsAspects(
return finalResult;
} catch (Exception e) {
log.error(
- "Error retrieving global tags for entities. Entities: {} aspect: {}",
- entityUrns,
+ "Error retrieving global tags for {} entities. Sample Urns: {} aspect: {}",
+ entityUrns.size(),
+ entityUrns.stream().limit(10).collect(Collectors.toList()),
Constants.GLOSSARY_TERMS_ASPECT_NAME,
e);
return Collections.emptyMap();