Skip to content

Commit

Permalink
Sync NARs from cluster coordinator when joining the cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
bbende committed Jun 28, 2024
1 parent da0eea4 commit 28c696c
Show file tree
Hide file tree
Showing 17 changed files with 612 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public enum HttpResponseStatus {

PROXY_AUTHENTICATION_REQUIRED(407),

CONFLICT(409),

INTERNAL_SERVER_ERROR(500),

SERVICE_UNAVAILABLE(503);
Expand All @@ -55,4 +57,12 @@ public enum HttpResponseStatus {
public int getCode() {
return code;
}

public boolean isSuccessful() {
return isSuccessful(code);
}

public static boolean isSuccessful(final int code) {
return code >= 200 && code < 300;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@ public class NarSummaryDTO {
private String identifier;
private NarCoordinateDTO coordinate;
private NarCoordinateDTO dependencyCoordinate;

private String buildTime;
private String createdBy;
private String digest;
private String sourceType;
private String sourceIdentifier;

private String state;
private String failureMessage;
Expand Down Expand Up @@ -97,6 +100,24 @@ public void setDigest(final String digest) {
this.digest = digest;
}

@Schema(description = "The source of this NAR")
public String getSourceType() {
return sourceType;
}

public void setSourceType(final String sourceType) {
this.sourceType = sourceType;
}

@Schema(description = "The identifier of the source of this NAR")
public String getSourceIdentifier() {
return sourceIdentifier;
}

public void setSourceIdentifier(final String sourceIdentifier) {
this.sourceIdentifier = sourceIdentifier;
}

@Schema(description = "The state of the NAR (i.e. Installed, or not)")
public String getState() {
return state;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,41 +22,31 @@
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.client.StandardWebClientService;
import org.apache.nifi.web.client.StandardHttpUriBuilder;
import org.apache.nifi.web.client.api.HttpRequestBodySpec;
import org.apache.nifi.web.client.api.HttpResponseEntity;
import org.apache.nifi.web.client.api.HttpResponseStatus;
import org.apache.nifi.web.client.api.WebClientService;
import org.apache.nifi.web.client.redirect.RedirectHandling;
import org.apache.nifi.web.client.ssl.TlsContext;
import org.apache.nifi.web.security.ProxiedEntitiesUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.net.ssl.SSLContext;
import javax.net.ssl.X509KeyManager;
import javax.net.ssl.X509TrustManager;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
* Implementation of {@link UploadRequestReplicator} that uses OkHttp Client.
Expand All @@ -67,56 +57,11 @@ public class StandardUploadRequestReplicator implements UploadRequestReplicator
private static final ObjectMapper objectMapper = new ObjectMapper();

private final ClusterCoordinator clusterCoordinator;
private final NiFiProperties properties;
private final SSLContext sslContext;
private final X509KeyManager keyManager;
private final X509TrustManager trustManager;
private final WebClientService webClientService;

public StandardUploadRequestReplicator(final ClusterCoordinator clusterCoordinator, final NiFiProperties properties, final SSLContext sslContext,
final X509KeyManager keyManager, final X509TrustManager trustManager) {
public StandardUploadRequestReplicator(final ClusterCoordinator clusterCoordinator, final WebClientService webClientService) {
this.clusterCoordinator = Objects.requireNonNull(clusterCoordinator, "Cluster Coordinator is required");
this.properties = Objects.requireNonNull(properties, "NiFiProperties is required");
this.sslContext = sslContext;
this.keyManager = keyManager;
this.trustManager = trustManager;
this.webClientService = createClient();
}

private WebClientService createClient() {
final long readTimeoutMillis = FormatUtils.getTimeDuration(properties.getClusterNodeReadTimeout(), TimeUnit.MILLISECONDS);
final Duration timeout = Duration.ofMillis(readTimeoutMillis);

final StandardWebClientService webClientService = new StandardWebClientService();
webClientService.setConnectTimeout(timeout);
webClientService.setReadTimeout(timeout);
webClientService.setRedirectHandling(RedirectHandling.FOLLOWED);

if (sslContext != null) {
webClientService.setTlsContext(getTlsContext(sslContext, trustManager, keyManager));
}

logger.info("Successfully initialized {}", getClass().getCanonicalName());
return webClientService;
}

private TlsContext getTlsContext(final SSLContext sslContext, final X509TrustManager trustManager, final X509KeyManager keyManager) {
return new TlsContext() {
@Override
public String getProtocol() {
return sslContext.getProtocol();
}

@Override
public X509TrustManager getTrustManager() {
return trustManager;
}

@Override
public Optional<X509KeyManager> getKeyManager() {
return Optional.of(keyManager);
}
};
this.webClientService = Objects.requireNonNull(webClientService, "Web Client Service is required");
}

@Override
Expand Down Expand Up @@ -187,24 +132,20 @@ private <T> Future<T> performUploadAsync(final NodeIdentifier nodeId, final Uplo

private <T> T replicateRequest(final NodeIdentifier nodeId, final UploadRequest<T> uploadRequest, final File contents) throws IOException {
final URI exampleRequestUri = uploadRequest.getExampleRequestUri();
final String schema = exampleRequestUri.getScheme();
final String address = nodeId.getApiAddress();
final int port = nodeId.getApiPort();
final String path = exampleRequestUri.getPath();
final URI uri;
try {
uri = new URI(schema, null, address, port, path, null, null);
} catch (final URISyntaxException e) {
throw new IOException(e);
}

final URI requestUri = new StandardHttpUriBuilder()
.scheme(exampleRequestUri.getScheme())
.host(nodeId.getApiAddress())
.port(nodeId.getApiPort())
.encodedPath(exampleRequestUri.getPath())
.build();

final NiFiUser user = uploadRequest.getUser();
final String filename = uploadRequest.getFilename();
final String uploadId = uploadRequest.getIdentifier();

try (final InputStream inputStream = new FileInputStream(contents)) {
final HttpRequestBodySpec request = webClientService.post()
.uri(uri)
.uri(requestUri)
.body(inputStream, OptionalLong.of(inputStream.available()))
.header(CONTENT_TYPE_HEADER, UPLOAD_CONTENT_TYPE)
.header(FILENAME_HEADER, filename)
Expand All @@ -215,17 +156,14 @@ private <T> T replicateRequest(final NodeIdentifier nodeId, final UploadRequest<
.header(ProxiedEntitiesUtils.PROXY_ENTITY_GROUPS, ProxiedEntitiesUtils.buildProxiedEntityGroupsString(user.getIdentityProviderGroups()));

logger.debug("Replicating upload request for {} to {}", filename, nodeId);

try (final HttpResponseEntity response = request.retrieve()) {
final int statusCode = response.statusCode();
if (!(statusCode >= 200 && statusCode < 300)) {
if (!HttpResponseStatus.isSuccessful(statusCode)) {
final String responseMessage = IOUtils.toString(response.body(), StandardCharsets.UTF_8);
throw new UploadRequestReplicationException("Failed to replicate upload request to " + nodeId + " - " + responseMessage, statusCode);
}

final InputStream responseBody = response.body();
if (responseBody == null) {
throw new IOException("Failed to replicate upload request to " + nodeId + ": received a successful response, but the response body was empty");
}
return objectMapper.readValue(responseBody, uploadRequest.getResponseClass());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.client.api.WebClientService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.net.ssl.SSLContext;
import javax.net.ssl.X509KeyManager;
import javax.net.ssl.X509TrustManager;

/**
Expand All @@ -49,6 +49,8 @@ public class FrameworkClusterConfiguration {

private ClusterCoordinator clusterCoordinator;

private WebClientService webClientService;

@Autowired
public void setProperties(final NiFiProperties properties) {
this.properties = properties;
Expand All @@ -69,6 +71,11 @@ public void setClusterCoordinator(final ClusterCoordinator clusterCoordinator) {
this.clusterCoordinator = clusterCoordinator;
}

@Autowired
public void setWebClientService(final WebClientService webClientService) {
this.webClientService = webClientService;
}

@Bean
public ThreadPoolRequestReplicator requestReplicator(
@Autowired(required = false) final SSLContext sslContext,
Expand Down Expand Up @@ -106,15 +113,10 @@ public ClusterDetailsFactory clusterDetailsFactory() {
}

@Bean
public UploadRequestReplicator uploadRequestReplicator(
@Autowired(required = false) final SSLContext sslContext,
@Autowired(required = false) final X509KeyManager keyManager,
@Autowired(required = false) final X509TrustManager trustManager
) {
public UploadRequestReplicator uploadRequestReplicator() {
if (clusterCoordinator == null) {
return null;
}

return new StandardUploadRequestReplicator(clusterCoordinator, properties, sslContext, keyManager, trustManager);
return new StandardUploadRequestReplicator(clusterCoordinator, webClientService);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,9 @@ public interface NarManager {
*/
InputStream readNar(String identifier);

/**
* Instructs the NAR Manager to sync it's NARs with the NARs from the cluster coordinator.
*/
void syncWithClusterCoordinator();

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,20 @@ public class NarNode {
private final String narFileHexDigest;
private final NarManifest manifest;

private final NarSource source;
private final String sourceIdentifier;

private volatile NarState state;
private volatile String failureMessage;

public NarNode(final String identifier, final File narFile, final String narFileHexDigest, final NarManifest manifest, final NarState state) {
this.identifier = Objects.requireNonNull(identifier);
this.narFile = Objects.requireNonNull(narFile);
this.narFileHexDigest = Objects.requireNonNull(narFileHexDigest);
this.manifest = Objects.requireNonNull(manifest);
this.state = Objects.requireNonNull(state);
private NarNode(final Builder builder) {
this.identifier = Objects.requireNonNull(builder.identifier);
this.narFile = Objects.requireNonNull(builder.narFile);
this.narFileHexDigest = Objects.requireNonNull(builder.narFileHexDigest);
this.manifest = Objects.requireNonNull(builder.manifest);
this.source = Objects.requireNonNull(builder.source);
this.sourceIdentifier = Objects.requireNonNull(builder.sourceIdentifier);
this.state = Objects.requireNonNull(builder.state);
}

public String getIdentifier() {
Expand All @@ -57,6 +62,14 @@ public NarManifest getManifest() {
return manifest;
}

public NarSource getSource() {
return source;
}

public String getSourceIdentifier() {
return sourceIdentifier;
}

public NarState getState() {
return state;
}
Expand Down Expand Up @@ -92,4 +105,63 @@ public boolean equals(final Object o) {
public int hashCode() {
return Objects.hash(identifier);
}

public static Builder builder() {
return new Builder();
}

public static final class Builder {
private String identifier;
private File narFile;
private String narFileHexDigest;
private NarManifest manifest;
private NarSource source;
private String sourceIdentifier;
private NarState state;
private String failureMessage;

public Builder identifier(final String identifier) {
this.identifier = identifier;
return this;
}

public Builder narFile(final File narFile) {
this.narFile = narFile;
return this;
}

public Builder narFileHexDigest(final String narFileHexDigest) {
this.narFileHexDigest = narFileHexDigest;
return this;
}

public Builder manifest(final NarManifest manifest) {
this.manifest = manifest;
return this;
}

public Builder source(final NarSource source) {
this.source = source;
return this;
}

public Builder sourceIdentifier(final String sourceIdentifier) {
this.sourceIdentifier = sourceIdentifier;
return this;
}

public Builder state(final NarState state) {
this.state = state;
return this;
}

public Builder failureMessage(final String failureMessage) {
this.failureMessage = failureMessage;
return this;
}

public NarNode build() {
return new NarNode(this);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,11 @@
<artifactId>nifi-stateless-api</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-web-client</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
Loading

0 comments on commit 28c696c

Please sign in to comment.