Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ 3° ] - Release/v1.4.0 Bugfix/dtr lookup: Fixed bugs related to negotiation timeout and "gost" contract id #148

Merged
4 changes: 2 additions & 2 deletions charts/digital-product-pass/templates/deployment-backend.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ spec:
- name: backend-config
mountPath: /app/config
- name: pvc-backend
mountPath: /app/data
subPath: data
mountPath: /app/data/process
subPath: data/process
- name: pvc-backend
mountPath: /app/log
subPath: log
Expand Down
14 changes: 7 additions & 7 deletions charts/digital-product-pass/values-beta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,11 @@ backend:
hosts:
- materialpass.beta.demo.catena-x.net

avp:
helm:
clientId: <path:material-pass/data/beta/backend#clientId>
clientSecret: <path:material-pass/data/beta/backend#clientSecret>
xApiKey: <path:material-pass/data/beta/edc/oauth#api.key>
participantId: <path:material-pass/data/beta/edc/participant#bpnNumber>
edc:
clientId: <path:material-pass/data/beta/backend#clientId>
clientSecret: <path:material-pass/data/beta/backend#clientSecret>
xApiKey: <path:material-pass/data/beta/edc/oauth#api.key>
participantId: <path:material-pass/data/beta/edc/participant#bpnNumber>

application:
yml: |-
Expand Down Expand Up @@ -148,7 +147,8 @@ backend:
subModel: "/submodel-descriptors"
timeouts:
search: 10
negotiation: 40
negotiation: 15
dtrRequestProcess: 40
transfer: 10
digitalTwin: 20
temporaryStorage: true
Expand Down
14 changes: 7 additions & 7 deletions charts/digital-product-pass/values-dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,11 @@ backend:
hosts:
- materialpass.dev.demo.catena-x.net

avp:
helm:
clientId: <path:material-pass/data/dev/backend#clientId>
clientSecret: <path:material-pass/data/dev/backend#clientSecret>
xApiKey: <path:material-pass/data/dev/edc/oauth#api.key>
participantId: <path:material-pass/data/dev/edc/participant#bpnNumber>
edc:
clientId: <path:material-pass/data/dev/backend#clientId>
clientSecret: <path:material-pass/data/dev/backend#clientSecret>
xApiKey: <path:material-pass/data/dev/edc/oauth#api.key>
participantId: <path:material-pass/data/dev/edc/participant#bpnNumber>

application:
yml: |-
Expand Down Expand Up @@ -148,7 +147,8 @@ backend:
subModel: "/submodel-descriptors"
timeouts:
search: 10
negotiation: 40
negotiation: 15
dtrRequestProcess: 40
transfer: 10
digitalTwin: 20
temporaryStorage: true
Expand Down
14 changes: 7 additions & 7 deletions charts/digital-product-pass/values-int.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,11 @@ backend:
hosts:
- materialpass.int.demo.catena-x.net

avp:
helm:
clientId: <path:material-pass/data/int/backend#clientId>
clientSecret: <path:material-pass/data/int/backend#clientSecret>
xApiKey: <path:material-pass/data/int/edc/oauth#api.key>
participantId: <path:material-pass/data/int/edc/participant#bpnNumber>
edc:
clientId: <path:material-pass/data/int/backend#clientId>
clientSecret: <path:material-pass/data/int/backend#clientSecret>
xApiKey: <path:material-pass/data/int/edc/oauth#api.key>
participantId: <path:material-pass/data/int/edc/participant#bpnNumber>

application:
yml: |-
Expand Down Expand Up @@ -147,7 +146,8 @@ backend:
subModel: "/submodel-descriptors"
timeouts:
search: 10
negotiation: 40
negotiation: 15
dtrRequestProcess: 40
transfer: 10
digitalTwin: 20
temporaryStorage: true
Expand Down
3 changes: 2 additions & 1 deletion charts/digital-product-pass/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,8 @@ backend:
# -- timeouts for the digital twin registry async negotiation
timeouts:
search: 10
negotiation: 40
negotiation: 15
dtrRequestProcess: 40
transfer: 10
digitalTwin: 20
# -- temporary storage of dDTRs for optimization
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ public static class Timeouts{
Integer negotiation;
Integer transfer;
Integer digitalTwin;
Integer dtrRequestProcess;

/** GETTERS AND SETTERS **/
public Integer getSearch() {
Expand Down Expand Up @@ -179,6 +180,14 @@ public Integer getDigitalTwin() {
public void setDigitalTwin(Integer digitalTwin) {
this.digitalTwin = digitalTwin;
}

public Integer getDtrRequestProcess() {
return dtrRequestProcess;
}

public void setDtrRequestProcess(Integer dtrRequestProcess) {
this.dtrRequestProcess = dtrRequestProcess;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ public Response create(@Valid @RequestBody DiscoverySearch searchBody) {
return httpUtil.buildResponse(response, httpResponse);
}
String processId = processManager.initProcess();
LogUtil.printMessage("Creating process [" + processId + "] for "+searchBody.getType() + ": "+ searchBody.getId());
ConcurrentHashMap<String, List<Dtr>> dataModel = null;
if(dtrConfig.getTemporaryStorage().getEnabled()) {
try {
Expand Down Expand Up @@ -187,7 +188,7 @@ public Response create(@Valid @RequestBody DiscoverySearch searchBody) {
for(Dtr dtr: dtrs){

Long validUntil = dtr.getValidUntil();
if(validUntil == null || validUntil < currentTimestamp){
if(dtr.getContractId() == null || dtr.getContractId().isEmpty() || validUntil == null || validUntil < currentTimestamp){
requestDtrs = true; // If the cache invalidation time has come request Dtrs
break;
}
Expand Down Expand Up @@ -270,7 +271,7 @@ public Response search(@Valid @RequestBody Search searchBody) {
response = httpUtil.getBadRequest("No processId was found on the request body!");
return httpUtil.buildResponse(response, httpResponse);
}

String processId = searchBody.getProcessId();
if(processId.isEmpty()){
response = httpUtil.getBadRequest("Process id is required for decentral digital twin registry searches!");
Expand All @@ -286,9 +287,12 @@ public Response search(@Valid @RequestBody Search searchBody) {
return httpUtil.buildResponse(response, httpResponse);
}
Boolean childrenCondition = searchBody.getChildren();
String logPrint = "[" + processId + "] Creating search for "+searchBody.getIdType() + ": "+ searchBody.getId();
if(childrenCondition != null){
LogUtil.printMessage(logPrint + " with drilldown enabled");
process = processManager.createProcess(processId, childrenCondition, httpRequest); // Store the children condition
}else {
LogUtil.printMessage(logPrint + " with drilldown disabled");
process = processManager.createProcess(processId, httpRequest);
}
Status status = processManager.getStatus(processId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public class DtrSearchManager {
private ConcurrentHashMap<String, List<Dtr>> dtrDataModel;
private ConcurrentHashMap<String, Catalog> catalogsCache;
private final long searchTimeoutSeconds;
private final long negotiationTimeoutSeconds;
private final long dtrRequestProcessTimeout;
private final String fileName = "dtrDataModel.json";
private String dtrDataModelFilePath;
private State state;
Expand All @@ -93,8 +93,7 @@ public DtrSearchManager(FileUtil fileUtil, JsonUtil jsonUtil, DataTransferServic
this.dtrDataModelFilePath = this.createDataModelFile();
this.dtrDataModel = this.loadDtrDataModel();
this.searchTimeoutSeconds = this.dtrConfig.getTimeouts().getSearch();
this.negotiationTimeoutSeconds = this.dtrConfig.getTimeouts().getNegotiation();

this.dtrRequestProcessTimeout = this.dtrConfig.getTimeouts().getDtrRequestProcess();
}

/** GETTERS AND SETTERS **/
Expand Down Expand Up @@ -205,7 +204,7 @@ public void run() {
}
public void searchEndpoint(String processId, String bpn, String endpoint){
//Search Digital Twin Catalog for each connectionURL with a timeout time
Thread asyncThread = ThreadUtil.runThread(searchDigitalTwinCatalogExecutor(endpoint), "ProcessDtrDataModel");
Thread asyncThread = ThreadUtil.runThread(searchDigitalTwinCatalogExecutor(endpoint), "SearchEndpoint"+processId+"-"+bpn+"-"+endpoint);
try {
if (!asyncThread.join(Duration.ofSeconds(searchTimeoutSeconds))) {
asyncThread.interrupt();
Expand All @@ -228,11 +227,11 @@ public void searchEndpoint(String processId, String bpn, String endpoint){
if (contractOffers instanceof LinkedHashMap) {
Dataset dataset = (Dataset) jsonUtil.bindObject(contractOffers, Dataset.class);
if (dataset != null) {
Thread singleOfferThread = ThreadUtil.runThread(createAndSaveDtr(dataset, bpn, endpoint, processId), "CreateAndSaveDtr");
Thread singleOfferThread = ThreadUtil.runThread(createAndSaveDtr(dataset, bpn, endpoint, processId), "CreateAndSaveDtr-"+processId+"-"+bpn+"-"+endpoint);
try {
if (!singleOfferThread.join(Duration.ofSeconds(negotiationTimeoutSeconds))) {
if (!singleOfferThread.join(Duration.ofSeconds(this.dtrRequestProcessTimeout))) {
singleOfferThread.interrupt();
LogUtil.printWarning("Failed to retrieve the Catalog due a timeout for the URL: " + endpoint);
LogUtil.printWarning("Failed to retrieve do contract negotiations due a timeout for the URL: " + endpoint);
return;
}
} catch (InterruptedException e) {
Expand All @@ -246,11 +245,11 @@ public void searchEndpoint(String processId, String bpn, String endpoint){
return;
}
contractOfferList.parallelStream().forEach(dataset -> {
Thread multipleOffersThread = ThreadUtil.runThread(createAndSaveDtr(dataset, bpn, endpoint, processId), "CreateAndSaveDtr");
Thread multipleOffersThread = ThreadUtil.runThread(createAndSaveDtr(dataset, bpn, endpoint, processId), "CreateAndSaveDtr-"+processId+"-"+bpn+"-"+endpoint);
try {
if (!multipleOffersThread.join(Duration.ofSeconds(negotiationTimeoutSeconds))) {
if (!multipleOffersThread.join(Duration.ofSeconds(this.dtrRequestProcessTimeout))) {
multipleOffersThread.interrupt();
LogUtil.printWarning("Failed to retrieve the Catalog due a timeout for the URL: " + endpoint);
LogUtil.printWarning("Failed to retrieve the contract negotiations due a timeout for the URL: " + endpoint);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
Expand Down Expand Up @@ -426,16 +425,22 @@ private Runnable createAndSaveDtr(Dataset dataset, String bpn, String connection
public void run() {
try {
Offer offer = dataTransferService.buildOffer(dataset, 0);
String builtDataEndpoint =CatenaXUtil.buildDataEndpoint(connectionUrl);
String builtDataEndpoint = CatenaXUtil.buildDataEndpoint(connectionUrl);
IdResponse negotiationResponse = dataTransferService.doContractNegotiation(offer, bpn, builtDataEndpoint);
if (negotiationResponse == null) {
return;
}
Negotiation negotiation = dataTransferService.seeNegotiation(negotiationResponse.getId());
Integer millis = dtrConfig.getTimeouts().getNegotiation() * 1000; // Set max timeout from seconds to milliseconds
// If negotiation takes way too much time give timeout
Negotiation negotiation = ThreadUtil.timeout(millis, ()->dataTransferService.seeNegotiation(negotiationResponse.getId()), null);
if (negotiation == null) {
LogUtil.printWarning("It was not possible to do ContractNegotiation for URL: " + connectionUrl);
return;
}
if(negotiation.getContractAgreementId() == null || negotiation.getContractAgreementId().isEmpty()){
LogUtil.printError("It was not possible to get an Contract Agreemment Id for the URL: " + connectionUrl);
return;
}
Dtr dtr = new Dtr(negotiation.getContractAgreementId(), connectionUrl, offer.getAssetId(), bpn, DateTimeUtil.addHoursToCurrentTimestamp(dtrConfig.getTemporaryStorage().getLifetime()));
if (dtrConfig.getTemporaryStorage().getEnabled()) {
addConnectionToBpnEntry(bpn, dtr);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,12 @@ public static <V> V timeout(Integer milliseconds, Callable<V> function, V timeou
{
try {
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<V> future = executor.submit(function);
boolean timeout = false;
V returnObject = null;
try {
Future<V> future = executor.submit(function);
returnObject = future.get(milliseconds, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
} catch (Exception e) {
timeout = true;
}
executor.shutdownNow();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ configuration:
subModel: "/submodel-descriptors"
timeouts:
search: 10
negotiation: 40
negotiation: 10
dtrRequestProcess: 40
transfer: 10
digitalTwin: 20
temporaryStorage:
Expand Down
Loading