Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LRA Custom headers propagation #3702 (#3768) #3877

Merged
merged 2 commits into from
Feb 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,24 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import io.helidon.common.http.Headers;
import io.helidon.common.http.Http;
import io.helidon.common.reactive.Single;
import io.helidon.faulttolerance.Retry;
import io.helidon.lra.coordinator.client.CoordinatorClient;
import io.helidon.lra.coordinator.client.CoordinatorConnectionException;
import io.helidon.lra.coordinator.client.Participant;
import io.helidon.lra.coordinator.client.PropagatedHeaders;
import io.helidon.webclient.WebClient;
import io.helidon.webclient.WebClientRequestHeaders;
import io.helidon.webclient.WebClientResponse;

import org.eclipse.microprofile.lra.annotation.LRAStatus;
Expand Down Expand Up @@ -75,16 +79,16 @@ public void init(Supplier<URI> coordinatorUriSupplier, long timeout, TimeUnit ti
}

@Override
public Single<URI> start(String clientID, long timeout) {
return startInternal(null, clientID, timeout);
public Single<URI> start(String clientID, PropagatedHeaders headers, long timeout) {
return startInternal(null, clientID, headers, timeout);
}

@Override
public Single<URI> start(URI parentLRAUri, String clientID, long timeout) {
return startInternal(parentLRAUri, clientID, timeout);
public Single<URI> start(URI parentLRAUri, String clientID, PropagatedHeaders headers, long timeout) {
return startInternal(parentLRAUri, clientID, headers, timeout);
}

private Single<URI> startInternal(URI parentLRA, String clientID, long timeout) {
private Single<URI> startInternal(URI parentLRA, String clientID, PropagatedHeaders headers, long timeout) {
// We need to call coordinator which knows parent LRA
URI baseUri = Optional.ofNullable(parentLRA)
.map(p -> parseBaseUri(p.toASCIIString()))
Expand All @@ -93,6 +97,7 @@ private Single<URI> startInternal(URI parentLRA, String clientID, long timeout)
return retry.invoke(() -> prepareWebClient(baseUri)
.post()
.path("start")
.headers(copyHeaders(headers)) // header propagation
.queryParam(QUERY_PARAM_CLIENT_ID, Optional.ofNullable(clientID).orElse(""))
.queryParam(QUERY_PARAM_TIME_LIMIT, String.valueOf(timeout))
.queryParam(QUERY_PARAM_PARENT_LRA, parentLRA == null ? "" : parentLRA.toASCIIString())
Expand All @@ -104,13 +109,15 @@ private Single<URI> startInternal(URI parentLRA, String clientID, long timeout)
connectionError("Unexpected response " + status + " from coordinator "
+ res.lastEndpointURI() + ": " + cont, null));
} else {
//propagate supported headers from coordinator
headers.scan(res.headers().toMap());
return Single.just(res);
}
})
.map(res -> res
.headers()
.location()
// TRM doesn't send lraId as LOCATION
// TMM doesn't send lraId as LOCATION
.or(() -> res.headers()
.first(LRA_HTTP_CONTEXT_HEADER)
.map(URI::create))
Expand All @@ -125,10 +132,11 @@ private Single<URI> startInternal(URI parentLRA, String clientID, long timeout)
}

@Override
public Single<Void> cancel(URI lraId) {
public Single<Void> cancel(URI lraId, PropagatedHeaders headers) {
return retry.invoke(() -> prepareWebClient(lraId)
.put()
.path("/cancel")
.headers(copyHeaders(headers)) // header propagation
.submit()
.map(WebClientResponse::status)
.flatMap(status -> {
Expand All @@ -152,10 +160,11 @@ public Single<Void> cancel(URI lraId) {
}

@Override
public Single<Void> close(URI lraId) {
public Single<Void> close(URI lraId, PropagatedHeaders headers) {
return retry.invoke(() -> prepareWebClient(lraId)
.put()
.path("/close")
.headers(copyHeaders(headers)) // header propagation
.submit()
.map(WebClientResponse::status)
.flatMap(status -> {
Expand All @@ -181,6 +190,7 @@ public Single<Void> close(URI lraId) {

@Override
public Single<Optional<URI>> join(URI lraId,
PropagatedHeaders headers,
long timeLimit,
Participant p) {
String links = compensatorLinks(p);
Expand All @@ -190,6 +200,7 @@ public Single<Optional<URI>> join(URI lraId,
.queryParam(QUERY_PARAM_TIME_LIMIT, String.valueOf(timeLimit))
.headers(h -> {
h.add(HEADER_LINK, links); // links are expected either in header
headers.toMap().forEach(h::add); // header propagation
return h;
})
.submit(links) // or as a body
Expand Down Expand Up @@ -218,10 +229,11 @@ public Single<Optional<URI>> join(URI lraId,
}

@Override
public Single<Void> leave(URI lraId, Participant p) {
public Single<Void> leave(URI lraId, PropagatedHeaders headers, Participant p) {
return retry.invoke(() -> prepareWebClient(lraId)
.put()
.path("/remove")
.headers(copyHeaders(headers)) // header propagation
.submit(compensatorLinks(p))
.flatMap(res -> {
switch (res.status().code()) {
Expand All @@ -245,10 +257,11 @@ public Single<Void> leave(URI lraId, Participant p) {


@Override
public Single<LRAStatus> status(URI lraId) {
public Single<LRAStatus> status(URI lraId, PropagatedHeaders headers) {
return retry.invoke(() -> prepareWebClient(lraId)
.get()
.path("/status")
.headers(copyHeaders(headers)) // header propagation
.request()
.flatMap(res -> {
switch (res.status().code()) {
Expand Down Expand Up @@ -328,6 +341,13 @@ static URI parseBaseUri(String lraUri) {
return URI.create(m.group(1));
}

private Function<WebClientRequestHeaders, Headers> copyHeaders(PropagatedHeaders headers) {
return wcHeaders -> {
headers.toMap().forEach(wcHeaders::add);
return wcHeaders;
};
}

private <T> Single<T> connectionError(String message, int status) {
LOGGER.warning(message);
return Single.error(new CoordinatorConnectionException(message, status));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021 Oracle and/or its affiliates.
* Copyright (c) 2021, 2022 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -30,6 +30,11 @@
*/
public interface CoordinatorClient {

/**
* Prefix of headers which should be propagated to the coordinator.
*/
String CONF_KEY_COORDINATOR_HEADERS_PROPAGATION_PREFIX = "mp.lra.coordinator.headers-propagation.prefix";

/**
* URL of the coordinator to be used for orchestrating Long Running Actions.
*/
Expand Down Expand Up @@ -63,60 +68,164 @@ public interface CoordinatorClient {
* @param clientID id specifying originating method/resource
* @param timeout after what time should be LRA cancelled automatically
* @return id of the new LRA
* @deprecated Use {@link io.helidon.lra.coordinator.client.CoordinatorClient#start(String, PropagatedHeaders, long)} instead
*/
@Deprecated
default Single<URI> start(String clientID, long timeout) {
return start(clientID, PropagatedHeaders.noop(), timeout);
}

/**
* Ask coordinator to start new LRA and return its id.
*
* @param clientID id specifying originating method/resource
* @param headers headers to be propagated to the coordinator
* @param timeout after what time should be LRA cancelled automatically
* @return id of the new LRA
*/
Single<URI> start(String clientID, PropagatedHeaders headers, long timeout);

/**
* Ask coordinator to start new LRA and return its id.
*
* @param parentLRA in case new LRA should be a child of already existing one
* @param clientID id specifying originating method/resource
* @param timeout after what time should be LRA cancelled automatically
* @return id of the new LRA
* @deprecated Use
* {@link io.helidon.lra.coordinator.client.CoordinatorClient#start(java.net.URI, String, PropagatedHeaders, long)} instead
*/
Single<URI> start(String clientID, long timeout);
@Deprecated
default Single<URI> start(URI parentLRA, String clientID, long timeout) {
return start(parentLRA, clientID, PropagatedHeaders.noop(), timeout);
}

/**
* Ask coordinator to start new LRA and return its id.
*
* @param parentLRA in case new LRA should be a child of already existing one
* @param clientID id specifying originating method/resource
* @param headers headers to be propagated to the coordinator
* @param timeout after what time should be LRA cancelled automatically
* @return id of the new LRA
*/
Single<URI> start(URI parentLRA, String clientID, long timeout);
Single<URI> start(URI parentLRA, String clientID, PropagatedHeaders headers, long timeout);

/**
* Join existing LRA with participant.
*
* @param lraId id of existing LRA
* @param timeLimit time limit in milliseconds after which should be LRA cancelled, 0 means never
* @param participant participant metadata with URLs to be called when complete/compensate ...
* @return recovery URI if supported by coordinator or empty
* @deprecated Use
* {@link io.helidon.lra.coordinator.client.CoordinatorClient#join(java.net.URI, PropagatedHeaders, long, Participant)} instead
*/
@Deprecated
default Single<Optional<URI>> join(URI lraId, long timeLimit, Participant participant) {
return join(lraId, PropagatedHeaders.noop(), timeLimit, participant);
}

/**
* Join existing LRA with participant.
*
* @param lraId id of existing LRA
* @param headers headers to be propagated to the coordinator
* @param timeLimit time limit in milliseconds after which should be LRA cancelled, 0 means never
* @param participant participant metadata with URLs to be called when complete/compensate ...
* @return recovery URI if supported by coordinator or empty
*/
Single<Optional<URI>> join(URI lraId, long timeLimit, Participant participant);
Single<Optional<URI>> join(URI lraId, PropagatedHeaders headers, long timeLimit, Participant participant);

/**
* Cancel LRA if its active. Should cause coordinator to compensate its participants.
*
* @param lraId id of the LRA to be cancelled
* @return single future of the cancel call
* @deprecated Use
* {@link io.helidon.lra.coordinator.client.CoordinatorClient#cancel(java.net.URI, PropagatedHeaders)} instead
*/
@Deprecated
default Single<Void> cancel(URI lraId) {
return cancel(lraId, PropagatedHeaders.noop());
}

/**
* Cancel LRA if its active. Should cause coordinator to compensate its participants.
*
* @param lraId id of the LRA to be cancelled
* @param headers headers to be propagated to the coordinator
* @return single future of the cancel call
*/
Single<Void> cancel(URI lraId);
Single<Void> cancel(URI lraId, PropagatedHeaders headers);

/**
* Close LRA if its active. Should cause coordinator to complete its participants.
*
* @param lraId id of the LRA to be closed
* @return single future of the cancel call
* @deprecated Use
* {@link io.helidon.lra.coordinator.client.CoordinatorClient#close(java.net.URI, PropagatedHeaders)} instead
*/
@Deprecated
default Single<Void> close(URI lraId) {
return close(lraId, PropagatedHeaders.noop());
}

/**
* Close LRA if its active. Should cause coordinator to complete its participants.
*
* @param lraId id of the LRA to be closed
* @param headers headers to be propagated to the coordinator
* @return single future of the cancel call
*/
Single<Void> close(URI lraId, PropagatedHeaders headers);

/**
* Leave LRA. Supplied participant won't be part of specified LRA any more,
* no compensation or completion will be executed on it.
*
* @param lraId id of the LRA that should be left by supplied participant
* @param participant participant which will leave
* @return single future of the cancel call
* @deprecated Use
* {@link io.helidon.lra.coordinator.client.CoordinatorClient#leave(java.net.URI, PropagatedHeaders, Participant)} instead
*/
Single<Void> close(URI lraId);
@Deprecated
default Single<Void> leave(URI lraId, Participant participant) {
return leave(lraId, PropagatedHeaders.noop(), participant);
}

/**
* Leave LRA. Supplied participant won't be part of specified LRA any more,
* no compensation or completion will be executed on it.
*
* @param lraId id of the LRA that should be left by supplied participant
* @param headers headers to be propagated to the coordinator
* @param participant participant which will leave
* @return single future of the cancel call
*/
Single<Void> leave(URI lraId, Participant participant);
Single<Void> leave(URI lraId, PropagatedHeaders headers, Participant participant);

/**
* Return status of specified LRA.
*
* @param lraId id of the queried LRA
* @return {@link org.eclipse.microprofile.lra.annotation.LRAStatus} of the queried LRA
* @deprecated Use
* {@link io.helidon.lra.coordinator.client.CoordinatorClient#status(java.net.URI, PropagatedHeaders)} instead
*/
@Deprecated
default Single<LRAStatus> status(URI lraId) {
return status(lraId, PropagatedHeaders.noop());
}

/**
* Return status of specified LRA.
*
* @param lraId id of the queried LRA
* @param headers headers to be propagated to the coordinator
* @return {@link org.eclipse.microprofile.lra.annotation.LRAStatus} of the queried LRA
*/
Single<LRAStatus> status(URI lraId);
Single<LRAStatus> status(URI lraId, PropagatedHeaders headers);
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021 Oracle and/or its affiliates.
* Copyright (c) 2021, 2022 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,6 +21,7 @@
/**
* Abstraction over the structure used for sending LRA id by coordinatior.
*/
@Deprecated
public interface Headers {

/**
Expand Down
Loading