Skip to content

Commit

Permalink
feat(impl):[#214] undo f7f4f3b
Browse files Browse the repository at this point in the history
  • Loading branch information
dsmf committed Dec 22, 2023
1 parent 26daa2a commit a614f98
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,7 @@
********************************************************************************/
package org.eclipse.tractusx.irs.registryclient.decentral;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -47,68 +44,18 @@ public class EndpointDataForConnectorsService {
private final EdcEndpointReferenceRetriever edcSubmodelFacade;

public EndpointDataReference findEndpointDataForConnectors(final List<String> connectorEndpoints) {
return new FindFastestImpl().findEndpointDataForConnectors(connectorEndpoints);
}

public class FindFastestImpl {

private final List<Worker> workers = Collections.synchronizedList(new ArrayList<>());
private final List<EndpointDataReference> outputScraper = Collections.synchronizedList(new ArrayList<>());
private final CountDownLatch countDownLatch = new CountDownLatch(1);

public EndpointDataReference findEndpointDataForConnectors(final List<String> connectorEndpoints) {

this.workers.addAll(connectorEndpoints.stream().map(Worker::new).toList());
this.workers.stream().map(Thread::new).toList().forEach(Thread::start);

for (final String connector : connectorEndpoints) {
log.info("Trying to retrieve EndpointDataReference for connector {}", connector);
try {
countDownLatch.await();
} catch (InterruptedException e) { // TODO #214 @mfischer better handling?!
throw new RuntimeException(e);
}

return outputScraper.stream()
.findAny()
.orElseThrow(() -> new RestClientException(
"EndpointDataReference was not found. Requested connectorEndpoints: "
+ String.join(", ", connectorEndpoints)));
}

public class Worker implements Runnable {
private final String connector;

public Worker(String connector) {
this.connector = connector;
}

@Override
public void run() {

log.info("Trying to retrieve EndpointDataReference for connector {}", connector);

try {
final var result = edcSubmodelFacade.getEndpointReferenceForAsset(connector, DT_REGISTRY_ASSET_TYPE,
DT_REGISTRY_ASSET_VALUE);

workers.remove(this);
outputScraper.add(result);
countDownLatch.countDown();

} catch (EdcRetrieverException e) {

workers.remove(this);
log.warn("Exception occurred when retrieving EndpointDataReference from connector {}", connector,
e);

if (noWorkersRemaining()) {
countDownLatch.countDown();
}
}
}

private boolean noWorkersRemaining() {
return workers.isEmpty();
return edcSubmodelFacade.getEndpointReferenceForAsset(connector, DT_REGISTRY_ASSET_TYPE,
DT_REGISTRY_ASSET_VALUE);
} catch (EdcRetrieverException e) {
log.warn("Exception occurred when retrieving EndpointDataReference from connector {}", connector, e);
}
}
throw new RestClientException(
"EndpointDataReference was not found. Requested connectorEndpoints: " + String.join(", ",
connectorEndpoints));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0.
*
* https://www.apache.org/licenses/LICENSE-2.0. *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
Expand Down Expand Up @@ -59,45 +58,39 @@ void shouldReturnExpectedEndpointDataReference() throws EdcRetrieverException {
EndpointDataReference.Builder.newInstance().endpoint(connectionOneAddress).build());

// when
final List<EndpointDataReference> endpointDataReference = endpointDataForConnectorsService.findEndpointDataForConnectors(
final EndpointDataReference endpointDataReference = endpointDataForConnectorsService.findEndpointDataForConnectors(
Collections.singletonList(connectionOneAddress));

// then
assertThat(endpointDataReference).hasSize(1);
assertThat(endpointDataReference.get(0)).isNotNull();
assertThat(endpointDataReference.get(0).getEndpoint()).isEqualTo(connectionOneAddress);
assertThat(endpointDataReference).isNotNull();
assertThat(endpointDataReference.getEndpoint()).isEqualTo(connectionOneAddress);
}

@Test
void shouldReturnExpectedEndpointDataReferenceFromSecondConnectionEndpoint() throws EdcRetrieverException {

{ // given
// a failing answer
when(edcSubmodelFacade.getEndpointReferenceForAsset(connectionOneAddress, DT_REGISTRY_ASSET_TYPE,
DT_REGISTRY_ASSET_VALUE)).thenThrow(edcRetrieverException());
// and a successful answer
when(edcSubmodelFacade.getEndpointReferenceForAsset(connectionTwoAddress, DT_REGISTRY_ASSET_TYPE,
DT_REGISTRY_ASSET_VALUE)).thenReturn(
EndpointDataReference.Builder.newInstance().endpoint(connectionTwoAddress).build());
}
// given
when(edcSubmodelFacade.getEndpointReferenceForAsset(connectionOneAddress, DT_REGISTRY_ASSET_TYPE,
DT_REGISTRY_ASSET_VALUE)).thenThrow(
new EdcRetrieverException(new EdcClientException("EdcClientException")));
when(edcSubmodelFacade.getEndpointReferenceForAsset(connectionTwoAddress, DT_REGISTRY_ASSET_TYPE,
DT_REGISTRY_ASSET_VALUE)).thenReturn(
EndpointDataReference.Builder.newInstance().endpoint(connectionTwoAddress).build());

// when
final List<EndpointDataReference> endpointDataReference = endpointDataForConnectorsService.findEndpointDataForConnectors(
final EndpointDataReference endpointDataReference = endpointDataForConnectorsService.findEndpointDataForConnectors(
List.of(connectionOneAddress, connectionTwoAddress));

// then
assertThat(endpointDataReference).hasSize(1);
assertThat(endpointDataReference.get(0)).isNotNull();
assertThat(endpointDataReference.get(0).getEndpoint()).isEqualTo(connectionTwoAddress);
assertThat(endpointDataReference).isNotNull();
assertThat(endpointDataReference.getEndpoint()).isEqualTo(connectionTwoAddress);
}

@Test
void shouldThrowExceptionWhenConnectorEndpointsNotReachable() throws EdcRetrieverException {

// given #
// all answers failing
// given
when(edcSubmodelFacade.getEndpointReferenceForAsset(anyString(), eq(DT_REGISTRY_ASSET_TYPE),
eq(DT_REGISTRY_ASSET_VALUE))).thenThrow(edcRetrieverException());
eq(DT_REGISTRY_ASSET_VALUE))).thenThrow(
new EdcRetrieverException(new EdcClientException("EdcClientException")));
final List<String> connectorEndpoints = List.of(connectionOneAddress, connectionTwoAddress);

// when + then
Expand All @@ -106,8 +99,4 @@ void shouldThrowExceptionWhenConnectorEndpointsNotReachable() throws EdcRetrieve
RestClientException.class).hasMessageContainingAll(connectionOneAddress, connectionTwoAddress);
}

private static EdcRetrieverException edcRetrieverException() {
return new EdcRetrieverException(new EdcClientException("EdcClientException"));
}

}

0 comments on commit a614f98

Please sign in to comment.