Skip to content

Commit

Permalink
feat(imp):[#214] parallelize edcSubmodelFacade.getEndpointReferenceFo…
Browse files Browse the repository at this point in the history
…rAsset in combination with the actual data request
  • Loading branch information
dsmf committed Jan 15, 2024
1 parent 3624c7e commit 99d6bfb
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 170 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
********************************************************************************/
package org.eclipse.tractusx.irs.registryclient.decentral;

import static java.util.Collections.emptyList;
import static java.util.concurrent.CompletableFuture.supplyAsync;

import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -104,20 +104,23 @@ private List<AssetAdministrationShellDescriptor> fetchShellDescriptors(final Set
final var connectorEndpoints = connectorEndpointsService.fetchConnectorEndpoints(bpn);
calledEndpoints.addAll(connectorEndpoints);

final var service = endpointDataForConnectorsService;

try {
final var endpointDataReferences = endpointDataForConnectorsService.findEndpointDataForConnectors(
connectorEndpoints);
final var futures = endpointDataReferences.stream()
.map(edr -> supplyAsync(
() -> fetchShellDescriptorsForKey(keys, edr)))
.toList();

final var futures = //
service.findEndpointDataForConnectors(connectorEndpoints)
.stream()
.map(edrFuture -> edrFuture.thenCompose(
edr -> supplyAsync(() -> fetchShellDescriptorsForKey(keys, edr))))
.toList();

return resultFinder.getFastestResult(futures).get();

} catch (InterruptedException e) {
log.debug("InterruptedException occurred while fetching shells for bpn '%s'".formatted(bpn), e);
Thread.currentThread().interrupt();
return Collections.emptyList();
return emptyList();
} catch (ExecutionException e) {
throw new RegistryServiceRuntimeException(
"Exception occurred while fetching shells for bpn '%s'".formatted(bpn), e);
Expand Down Expand Up @@ -164,12 +167,13 @@ private String mapToShellId(final EndpointDataReference endpointDataReference, f
private Collection<String> lookupShellIds(final String bpn) {
log.info("Looking up shell ids for bpn {}", bpn);
final var connectorEndpoints = connectorEndpointsService.fetchConnectorEndpoints(bpn);
final var endpointDataReferences = endpointDataForConnectorsService.findEndpointDataForConnectors(
connectorEndpoints);
final var edrFutures = endpointDataForConnectorsService.findEndpointDataForConnectors(connectorEndpoints);

try {
final var futures = endpointDataReferences.stream().map(edr -> supplyAsync(() -> lookupShellIds(bpn, edr)))
.toList();
final var futures = edrFutures.stream()
.map(edrFuture -> edrFuture.thenCompose(
edr -> supplyAsync(() -> lookupShellIds(bpn, edr))))
.toList();
final var shellIds = resultFinder.getFastestResult(futures).get();

log.info("Found {} shell id(s) in total", shellIds.size());
Expand All @@ -178,7 +182,7 @@ private Collection<String> lookupShellIds(final String bpn) {
} catch (InterruptedException e) {
log.debug("InterruptedException occurred while looking up shells ids for bpn '%s'".formatted(bpn), e);
Thread.currentThread().interrupt();
return Collections.emptyList();
return emptyList();
} catch (ExecutionException e) {
throw new RegistryServiceRuntimeException(
"Exception occurred while looking up shell ids for bpn '%s'".formatted(bpn), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@
package org.eclipse.tractusx.irs.registryclient.decentral;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference;
import org.springframework.web.client.RestClientException;

/**
* Service that use edc client to make calls to edc connector endpoints
Expand All @@ -44,32 +44,25 @@ public class EndpointDataForConnectorsService {

private final EdcEndpointReferenceRetriever edcSubmodelFacade;

public List<EndpointDataReference> findEndpointDataForConnectors(final List<String> connectorEndpoints) {
public List<CompletableFuture<EndpointDataReference>> findEndpointDataForConnectors(
final List<String> connectorEndpoints) {

final List<EndpointDataReference> endpointDataReferences = //
connectorEndpoints.stream().parallel() //
.map(this::getEndpointReferenceForAsset) //
.filter(Optional::isPresent) //
.map(Optional::get) //
.map(EndpointDataReference.class::cast).toList();

if (endpointDataReferences.isEmpty()) {
throw new RestClientException("EndpointDataReference was not found. Requested connectorEndpoints: " //
+ String.join(", ", connectorEndpoints));
} else {
return endpointDataReferences;
}
return connectorEndpoints.stream()
.map(connectorEndpoint -> CompletableFuture.supplyAsync(
() -> getEndpointReferenceForAsset(connectorEndpoint)))
.toList();
}

private Optional<?> getEndpointReferenceForAsset(final String connector) {
private EndpointDataReference getEndpointReferenceForAsset(final String connector) {
log.info("Trying to retrieve EndpointDataReference for connector {}", connector);
try {
return Optional.ofNullable(edcSubmodelFacade.getEndpointReferenceForAsset(connector, DT_REGISTRY_ASSET_TYPE,
DT_REGISTRY_ASSET_VALUE));
return edcSubmodelFacade.getEndpointReferenceForAsset(connector, DT_REGISTRY_ASSET_TYPE,
DT_REGISTRY_ASSET_VALUE);
} catch (EdcRetrieverException e) {
log.warn("Exception occurred when retrieving EndpointDataReference from connector {}", connector, e);
return Optional.empty();
throw new CompletionException(e.getMessage(), e);
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,14 @@
import static org.mockito.Mockito.when;

import java.util.List;
import java.util.concurrent.ExecutionException;

import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference;
import org.eclipse.tractusx.irs.edc.client.EdcSubmodelClient;
import org.eclipse.tractusx.irs.edc.client.EdcSubmodelFacade;
import org.eclipse.tractusx.irs.edc.client.exceptions.EdcClientException;
import org.eclipse.tractusx.irs.registryclient.decentral.EdcRetrieverException;
import org.junit.jupiter.api.Test;
import org.springframework.web.client.RestClientException;
import org.springframework.web.client.RestTemplate;

class DefaultConfigurationTest {
Expand Down Expand Up @@ -85,7 +86,15 @@ void endpointDataForConnectorsService() throws EdcClientException {

// ACT
final var endpointDataForConnectorsService = testee.endpointDataForConnectorsService(mock);
endpointDataForConnectorsService.findEndpointDataForConnectors(List.of(endpointAddress));

endpointDataForConnectorsService.findEndpointDataForConnectors(List.of(endpointAddress)) //
.forEach(future -> {
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});

// ASSERT
verify(mock).getEndpointReferenceForAsset(eq(endpointAddress), any(), any());
Expand All @@ -98,9 +107,10 @@ void endpointDataForConnectorsService_withException() throws EdcClientException

final var endpointDataForConnectorsService = testee.endpointDataForConnectorsService(mock);
final var dummyEndpoints = List.of("test");
assertThatThrownBy(
() -> endpointDataForConnectorsService.findEndpointDataForConnectors(dummyEndpoints)).isInstanceOf(
RestClientException.class);

endpointDataForConnectorsService.findEndpointDataForConnectors(dummyEndpoints).forEach(future -> {
assertThatThrownBy(future::get).isInstanceOf(ExecutionException.class)
.extracting(Throwable::getCause)
.isInstanceOf(EdcRetrieverException.class);
});
}
}
Loading

0 comments on commit 99d6bfb

Please sign in to comment.