diff --git a/nifi-commons/nifi-web-client-api/src/main/java/org/apache/nifi/web/client/api/HttpResponseStatus.java b/nifi-commons/nifi-web-client-api/src/main/java/org/apache/nifi/web/client/api/HttpResponseStatus.java index 915cf3cb63e7..79fe3b26fc0a 100644 --- a/nifi-commons/nifi-web-client-api/src/main/java/org/apache/nifi/web/client/api/HttpResponseStatus.java +++ b/nifi-commons/nifi-web-client-api/src/main/java/org/apache/nifi/web/client/api/HttpResponseStatus.java @@ -42,6 +42,8 @@ public enum HttpResponseStatus { PROXY_AUTHENTICATION_REQUIRED(407), + CONFLICT(409), + INTERNAL_SERVER_ERROR(500), SERVICE_UNAVAILABLE(503); @@ -55,4 +57,12 @@ public enum HttpResponseStatus { public int getCode() { return code; } + + public boolean isSuccessful() { + return isSuccessful(code); + } + + public static boolean isSuccessful(final int code) { + return code >= 200 && code < 300; + } } diff --git a/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/NarSummaryDTO.java b/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/NarSummaryDTO.java index f0b5a0a62416..b7df2ea97196 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/NarSummaryDTO.java +++ b/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/NarSummaryDTO.java @@ -28,9 +28,12 @@ public class NarSummaryDTO { private String identifier; private NarCoordinateDTO coordinate; private NarCoordinateDTO dependencyCoordinate; + private String buildTime; private String createdBy; private String digest; + private String sourceType; + private String sourceIdentifier; private String state; private String failureMessage; @@ -97,6 +100,24 @@ public void setDigest(final String digest) { this.digest = digest; } + @Schema(description = "The source of this NAR") + public String getSourceType() { + return sourceType; + } + + public void setSourceType(final String sourceType) { + this.sourceType = sourceType; + } + + @Schema(description = "The identifier of the source of this NAR") + public String getSourceIdentifier() { + return sourceIdentifier; + } + + public void setSourceIdentifier(final String sourceIdentifier) { + this.sourceIdentifier = sourceIdentifier; + } + @Schema(description = "The state of the NAR (i.e. Installed, or not)") public String getState() { return state; diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardUploadRequestReplicator.java b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardUploadRequestReplicator.java index 8d69c793db78..724703b11ad1 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardUploadRequestReplicator.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardUploadRequestReplicator.java @@ -22,41 +22,31 @@ import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.cluster.coordination.ClusterCoordinator; import org.apache.nifi.cluster.protocol.NodeIdentifier; -import org.apache.nifi.util.FormatUtils; -import org.apache.nifi.util.NiFiProperties; -import org.apache.nifi.web.client.StandardWebClientService; +import org.apache.nifi.web.client.StandardHttpUriBuilder; import org.apache.nifi.web.client.api.HttpRequestBodySpec; import org.apache.nifi.web.client.api.HttpResponseEntity; +import org.apache.nifi.web.client.api.HttpResponseStatus; import org.apache.nifi.web.client.api.WebClientService; -import org.apache.nifi.web.client.redirect.RedirectHandling; -import org.apache.nifi.web.client.ssl.TlsContext; import org.apache.nifi.web.security.ProxiedEntitiesUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.net.ssl.SSLContext; -import javax.net.ssl.X509KeyManager; -import javax.net.ssl.X509TrustManager; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.net.URI; -import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.StandardCopyOption; -import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; /** * Implementation of {@link UploadRequestReplicator} that uses OkHttp Client. @@ -67,56 +57,11 @@ public class StandardUploadRequestReplicator implements UploadRequestReplicator private static final ObjectMapper objectMapper = new ObjectMapper(); private final ClusterCoordinator clusterCoordinator; - private final NiFiProperties properties; - private final SSLContext sslContext; - private final X509KeyManager keyManager; - private final X509TrustManager trustManager; private final WebClientService webClientService; - public StandardUploadRequestReplicator(final ClusterCoordinator clusterCoordinator, final NiFiProperties properties, final SSLContext sslContext, - final X509KeyManager keyManager, final X509TrustManager trustManager) { + public StandardUploadRequestReplicator(final ClusterCoordinator clusterCoordinator, final WebClientService webClientService) { this.clusterCoordinator = Objects.requireNonNull(clusterCoordinator, "Cluster Coordinator is required"); - this.properties = Objects.requireNonNull(properties, "NiFiProperties is required"); - this.sslContext = sslContext; - this.keyManager = keyManager; - this.trustManager = trustManager; - this.webClientService = createClient(); - } - - private WebClientService createClient() { - final long readTimeoutMillis = FormatUtils.getTimeDuration(properties.getClusterNodeReadTimeout(), TimeUnit.MILLISECONDS); - final Duration timeout = Duration.ofMillis(readTimeoutMillis); - - final StandardWebClientService webClientService = new StandardWebClientService(); - webClientService.setConnectTimeout(timeout); - webClientService.setReadTimeout(timeout); - webClientService.setRedirectHandling(RedirectHandling.FOLLOWED); - - if (sslContext != null) { - webClientService.setTlsContext(getTlsContext(sslContext, trustManager, keyManager)); - } - - logger.info("Successfully initialized {}", getClass().getCanonicalName()); - return webClientService; - } - - private TlsContext getTlsContext(final SSLContext sslContext, final X509TrustManager trustManager, final X509KeyManager keyManager) { - return new TlsContext() { - @Override - public String getProtocol() { - return sslContext.getProtocol(); - } - - @Override - public X509TrustManager getTrustManager() { - return trustManager; - } - - @Override - public Optional getKeyManager() { - return Optional.of(keyManager); - } - }; + this.webClientService = Objects.requireNonNull(webClientService, "Web Client Service is required"); } @Override @@ -187,24 +132,20 @@ private Future performUploadAsync(final NodeIdentifier nodeId, final Uplo private T replicateRequest(final NodeIdentifier nodeId, final UploadRequest uploadRequest, final File contents) throws IOException { final URI exampleRequestUri = uploadRequest.getExampleRequestUri(); - final String schema = exampleRequestUri.getScheme(); - final String address = nodeId.getApiAddress(); - final int port = nodeId.getApiPort(); - final String path = exampleRequestUri.getPath(); - final URI uri; - try { - uri = new URI(schema, null, address, port, path, null, null); - } catch (final URISyntaxException e) { - throw new IOException(e); - } + + final URI requestUri = new StandardHttpUriBuilder() + .scheme(exampleRequestUri.getScheme()) + .host(nodeId.getApiAddress()) + .port(nodeId.getApiPort()) + .encodedPath(exampleRequestUri.getPath()) + .build(); final NiFiUser user = uploadRequest.getUser(); final String filename = uploadRequest.getFilename(); - final String uploadId = uploadRequest.getIdentifier(); try (final InputStream inputStream = new FileInputStream(contents)) { final HttpRequestBodySpec request = webClientService.post() - .uri(uri) + .uri(requestUri) .body(inputStream, OptionalLong.of(inputStream.available())) .header(CONTENT_TYPE_HEADER, UPLOAD_CONTENT_TYPE) .header(FILENAME_HEADER, filename) @@ -215,17 +156,14 @@ private T replicateRequest(final NodeIdentifier nodeId, final UploadRequest< .header(ProxiedEntitiesUtils.PROXY_ENTITY_GROUPS, ProxiedEntitiesUtils.buildProxiedEntityGroupsString(user.getIdentityProviderGroups())); logger.debug("Replicating upload request for {} to {}", filename, nodeId); + try (final HttpResponseEntity response = request.retrieve()) { final int statusCode = response.statusCode(); - if (!(statusCode >= 200 && statusCode < 300)) { + if (!HttpResponseStatus.isSuccessful(statusCode)) { final String responseMessage = IOUtils.toString(response.body(), StandardCharsets.UTF_8); throw new UploadRequestReplicationException("Failed to replicate upload request to " + nodeId + " - " + responseMessage, statusCode); } - final InputStream responseBody = response.body(); - if (responseBody == null) { - throw new IOException("Failed to replicate upload request to " + nodeId + ": received a successful response, but the response body was empty"); - } return objectMapper.readValue(responseBody, uploadRequest.getResponseClass()); } } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/framework/configuration/FrameworkClusterConfiguration.java b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/framework/configuration/FrameworkClusterConfiguration.java index 8e2713866f1e..1d5fa25dfed4 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/framework/configuration/FrameworkClusterConfiguration.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/framework/configuration/FrameworkClusterConfiguration.java @@ -28,12 +28,12 @@ import org.apache.nifi.controller.FlowController; import org.apache.nifi.events.EventReporter; import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.web.client.api.WebClientService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.net.ssl.SSLContext; -import javax.net.ssl.X509KeyManager; import javax.net.ssl.X509TrustManager; /** @@ -49,6 +49,8 @@ public class FrameworkClusterConfiguration { private ClusterCoordinator clusterCoordinator; + private WebClientService webClientService; + @Autowired public void setProperties(final NiFiProperties properties) { this.properties = properties; @@ -69,6 +71,11 @@ public void setClusterCoordinator(final ClusterCoordinator clusterCoordinator) { this.clusterCoordinator = clusterCoordinator; } + @Autowired + public void setWebClientService(final WebClientService webClientService) { + this.webClientService = webClientService; + } + @Bean public ThreadPoolRequestReplicator requestReplicator( @Autowired(required = false) final SSLContext sslContext, @@ -106,15 +113,10 @@ public ClusterDetailsFactory clusterDetailsFactory() { } @Bean - public UploadRequestReplicator uploadRequestReplicator( - @Autowired(required = false) final SSLContext sslContext, - @Autowired(required = false) final X509KeyManager keyManager, - @Autowired(required = false) final X509TrustManager trustManager - ) { + public UploadRequestReplicator uploadRequestReplicator() { if (clusterCoordinator == null) { return null; } - - return new StandardUploadRequestReplicator(clusterCoordinator, properties, sslContext, keyManager, trustManager); + return new StandardUploadRequestReplicator(clusterCoordinator, webClientService); } } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/nar/NarManager.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/nar/NarManager.java index 1a96b6932de4..a308e2d704aa 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/nar/NarManager.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/nar/NarManager.java @@ -95,4 +95,9 @@ public interface NarManager { */ InputStream readNar(String identifier); + /** + * Instructs the NAR Manager to sync it's NARs with the NARs from the cluster coordinator. + */ + void syncWithClusterCoordinator(); + } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/nar/NarNode.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/nar/NarNode.java index 2922c0d9d3b3..344042493674 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/nar/NarNode.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/nar/NarNode.java @@ -30,15 +30,20 @@ public class NarNode { private final String narFileHexDigest; private final NarManifest manifest; + private final NarSource source; + private final String sourceIdentifier; + private volatile NarState state; private volatile String failureMessage; - public NarNode(final String identifier, final File narFile, final String narFileHexDigest, final NarManifest manifest, final NarState state) { - this.identifier = Objects.requireNonNull(identifier); - this.narFile = Objects.requireNonNull(narFile); - this.narFileHexDigest = Objects.requireNonNull(narFileHexDigest); - this.manifest = Objects.requireNonNull(manifest); - this.state = Objects.requireNonNull(state); + private NarNode(final Builder builder) { + this.identifier = Objects.requireNonNull(builder.identifier); + this.narFile = Objects.requireNonNull(builder.narFile); + this.narFileHexDigest = Objects.requireNonNull(builder.narFileHexDigest); + this.manifest = Objects.requireNonNull(builder.manifest); + this.source = Objects.requireNonNull(builder.source); + this.sourceIdentifier = Objects.requireNonNull(builder.sourceIdentifier); + this.state = Objects.requireNonNull(builder.state); } public String getIdentifier() { @@ -57,6 +62,14 @@ public NarManifest getManifest() { return manifest; } + public NarSource getSource() { + return source; + } + + public String getSourceIdentifier() { + return sourceIdentifier; + } + public NarState getState() { return state; } @@ -92,4 +105,63 @@ public boolean equals(final Object o) { public int hashCode() { return Objects.hash(identifier); } + + public static Builder builder() { + return new Builder(); + } + + public static final class Builder { + private String identifier; + private File narFile; + private String narFileHexDigest; + private NarManifest manifest; + private NarSource source; + private String sourceIdentifier; + private NarState state; + private String failureMessage; + + public Builder identifier(final String identifier) { + this.identifier = identifier; + return this; + } + + public Builder narFile(final File narFile) { + this.narFile = narFile; + return this; + } + + public Builder narFileHexDigest(final String narFileHexDigest) { + this.narFileHexDigest = narFileHexDigest; + return this; + } + + public Builder manifest(final NarManifest manifest) { + this.manifest = manifest; + return this; + } + + public Builder source(final NarSource source) { + this.source = source; + return this; + } + + public Builder sourceIdentifier(final String sourceIdentifier) { + this.sourceIdentifier = sourceIdentifier; + return this; + } + + public Builder state(final NarState state) { + this.state = state; + return this; + } + + public Builder failureMessage(final String failureMessage) { + this.failureMessage = failureMessage; + return this; + } + + public NarNode build() { + return new NarNode(this); + } + } } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml b/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml index 077b390022fb..8754a6273048 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml @@ -384,6 +384,11 @@ nifi-stateless-api 2.0.0-SNAPSHOT + + org.apache.nifi + nifi-web-client + 2.0.0-SNAPSHOT + diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java index 4b654b3b8ab2..c97693b1cdc7 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java @@ -62,6 +62,7 @@ import org.apache.nifi.lifecycle.LifeCycleStartException; import org.apache.nifi.logging.LogLevel; import org.apache.nifi.nar.NarClassLoadersHolder; +import org.apache.nifi.nar.NarManager; import org.apache.nifi.persistence.FlowConfigurationDAO; import org.apache.nifi.persistence.StandardFlowConfigurationDAO; import org.apache.nifi.reporting.Bulletin; @@ -121,6 +122,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler { private final AtomicReference saveHolder = new AtomicReference<>(null); private final ClusterCoordinator clusterCoordinator; private final RevisionManager revisionManager; + private final NarManager narManager; private volatile SaveReportingTask saveReportingTask; /** @@ -150,9 +152,10 @@ public static StandardFlowService createStandaloneInstance( final FlowController controller, final NiFiProperties nifiProperties, final RevisionManager revisionManager, + final NarManager narManager, final Authorizer authorizer) throws IOException { - return new StandardFlowService(controller, nifiProperties, null, false, null, revisionManager, authorizer); + return new StandardFlowService(controller, nifiProperties, null, false, null, revisionManager, narManager, authorizer); } public static StandardFlowService createClusteredInstance( @@ -161,9 +164,10 @@ public static StandardFlowService createClusteredInstance( final NodeProtocolSenderListener senderListener, final ClusterCoordinator coordinator, final RevisionManager revisionManager, + final NarManager narManager, final Authorizer authorizer) throws IOException { - return new StandardFlowService(controller, nifiProperties, senderListener, true, coordinator, revisionManager, authorizer); + return new StandardFlowService(controller, nifiProperties, senderListener, true, coordinator, revisionManager, narManager, authorizer); } private StandardFlowService( @@ -173,6 +177,7 @@ private StandardFlowService( final boolean configuredForClustering, final ClusterCoordinator clusterCoordinator, final RevisionManager revisionManager, + final NarManager narManager, final Authorizer authorizer) throws IOException { this.nifiProperties = nifiProperties; @@ -187,6 +192,7 @@ private StandardFlowService( clusterCoordinator.setFlowService(this); } this.revisionManager = revisionManager; + this.narManager = narManager; this.authorizer = authorizer; if (configuredForClustering) { @@ -950,6 +956,9 @@ private void loadFromConnectionResponse(final ConnectionResponse response) throw nodeId = response.getNodeIdentifier(); controller.setNodeId(nodeId); + // sync NARs before loading flow, otherwise components could be ghosted and fail to join the cluster + narManager.syncWithClusterCoordinator(); + // load new controller state loadFromBytes(dataFlow, true, BundleUpdateStrategy.USE_SPECIFIED_OR_COMPATIBLE_OR_GHOST); diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/framework/configuration/FlowControllerConfiguration.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/framework/configuration/FlowControllerConfiguration.java index d723c9a81139..7bd479376229 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/framework/configuration/FlowControllerConfiguration.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/framework/configuration/FlowControllerConfiguration.java @@ -49,9 +49,14 @@ import org.apache.nifi.nar.StandardNarManager; import org.apache.nifi.reporting.BulletinRepository; import org.apache.nifi.services.FlowService; +import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.validation.RuleViolationsManager; import org.apache.nifi.validation.StandardRuleViolationsManager; +import org.apache.nifi.web.client.StandardWebClientService; +import org.apache.nifi.web.client.api.WebClientService; +import org.apache.nifi.web.client.redirect.RedirectHandling; +import org.apache.nifi.web.client.ssl.TlsContext; import org.apache.nifi.web.revision.RevisionManager; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; @@ -59,6 +64,11 @@ import org.springframework.context.annotation.Configuration; import javax.net.ssl.SSLContext; +import javax.net.ssl.X509KeyManager; +import javax.net.ssl.X509TrustManager; +import java.time.Duration; +import java.util.Optional; +import java.util.concurrent.TimeUnit; /** * Framework Flow Controller Configuration class for Spring Application @@ -80,6 +90,10 @@ public class FlowControllerConfiguration { private SSLContext sslContext; + private X509KeyManager keyManager; + + private X509TrustManager trustManager; + private StateManagerProvider stateManagerProvider; private BulletinRepository bulletinRepository; @@ -137,6 +151,16 @@ public void setSslContext(final SSLContext sslContext) { this.sslContext = sslContext; } + @Autowired(required = false) + public void setKeyManager(final X509KeyManager keyManager) { + this.keyManager = keyManager; + } + + @Autowired(required = false) + public void setTrustManager(final X509TrustManager trustManager) { + this.trustManager = trustManager; + } + @Qualifier("nodeProtocolSender") @Autowired(required = false) public void setNodeProtocolSender(final NodeProtocolSender nodeProtocolSender) { @@ -221,6 +245,7 @@ public FlowService flowService() throws Exception { flowController(), properties, revisionManager, + narManager(), authorizer ); } else { @@ -230,6 +255,7 @@ public FlowService flowService() throws Exception { nodeProtocolSenderListener, clusterCoordinator, revisionManager, + narManager(), authorizer ); } @@ -330,6 +356,38 @@ public RuntimeManifestService runtimeManifestService() { return new StandardRuntimeManifestService(extensionManager, extensionManifestParser()); } + @Bean + public WebClientService webClientService() { + final long readTimeoutMillis = FormatUtils.getTimeDuration(properties.getClusterNodeReadTimeout(), TimeUnit.MILLISECONDS); + final Duration timeout = Duration.ofMillis(readTimeoutMillis); + + final StandardWebClientService webClientService = new StandardWebClientService(); + webClientService.setConnectTimeout(timeout); + webClientService.setReadTimeout(timeout); + webClientService.setRedirectHandling(RedirectHandling.FOLLOWED); + + if (sslContext != null) { + webClientService.setTlsContext(new TlsContext() { + @Override + public String getProtocol() { + return sslContext.getProtocol(); + } + + @Override + public X509TrustManager getTrustManager() { + return trustManager; + } + + @Override + public Optional getKeyManager() { + return Optional.of(keyManager); + } + }); + } + + return webClientService; + } + /** * NAR Loader from the holder that was set by Jetty. * @@ -363,7 +421,9 @@ public NarManager narManager() throws Exception { flowController(), clusterCoordinator, narComponentManager(), - narLoader() + narLoader(), + webClientService(), + sslContext ); } } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/nar/NarInstallTask.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/nar/NarInstallTask.java index 3ced1a50dd02..ea35e8e57c89 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/nar/NarInstallTask.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/nar/NarInstallTask.java @@ -75,7 +75,9 @@ public void run() { // If replacing an existing NAR with the same coordinate, then unload the existing NAR and stop+ghost any components from it final StandardStoppedComponents stoppedComponents = new StandardStoppedComponents(controllerServiceProvider); final Bundle existingBundle = extensionManager.getBundle(coordinate); - if (existingBundle != null) { + if (existingBundle == null) { + LOGGER.info("Installing NAR [{}] with coordinate [{}]", narNode.getIdentifier(), coordinate); + } else { LOGGER.info("Replacing NAR [{}], unloading existing NAR and components", coordinate); narLoader.unload(existingBundle); narComponentManager.unloadComponents(coordinate, stoppedComponents); @@ -88,6 +90,7 @@ public void run() { // the NAR now becomes available, as well as restoring any component that may have been purposely unloaded above for replacing an existing NAR for (final Bundle loadedBundle : narLoadResult.getLoadedBundles()) { final BundleCoordinate loadedCoordinate = loadedBundle.getBundleDetails().getCoordinate(); + LOGGER.info("NAR [{}] was installed", loadedCoordinate); if (loadedCoordinate.equals(coordinate)) { narNode.setState(NarState.INSTALLED); } else { @@ -102,6 +105,7 @@ public void run() { for (final BundleDetails skippedBundles : narLoadResult.getSkippedBundles()) { final BundleCoordinate skippedCoordinate = skippedBundles.getCoordinate(); + LOGGER.info("NAR [{}] is missing dependency", skippedCoordinate); if (skippedCoordinate.equals(coordinate)) { narNode.setState(NarState.MISSING_DEPENDENCY); } else { diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/nar/NarRestApiClient.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/nar/NarRestApiClient.java new file mode 100644 index 000000000000..ecad3b95419d --- /dev/null +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/nar/NarRestApiClient.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.nar; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.module.jakarta.xmlbind.JakartaXmlBindAnnotationIntrospector; +import org.apache.commons.io.IOUtils; +import org.apache.nifi.web.api.entity.NarSummariesEntity; +import org.apache.nifi.web.client.StandardHttpUriBuilder; +import org.apache.nifi.web.client.api.HttpRequestBodySpec; +import org.apache.nifi.web.client.api.HttpResponseEntity; +import org.apache.nifi.web.client.api.HttpResponseStatus; +import org.apache.nifi.web.client.api.WebClientService; +import org.apache.nifi.web.client.api.WebClientServiceException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.util.Objects; + +/** + * Encapsulate API calls for listing and downloading NARs through the REST API of a NiFi node. + */ +public class NarRestApiClient { + + private static final Logger LOGGER = LoggerFactory.getLogger(NarRestApiClient.class); + + private static final String HTTP_SCHEME = "http"; + private static final String HTTPS_SCHEME = "https"; + + private static final String NIFI_API_PATH = "nifi-api"; + private static final String CONTROLLER_PATH = "controller"; + private static final String NAR_MANAGER_PATH = "nar-manager"; + private static final String NARS_PATH = "nars"; + private static final String NAR_DOWNLOAD_PATH = "download"; + + private final URI baseUri; + private final WebClientService webClientService; + private final ObjectMapper objectMapper; + + public NarRestApiClient(final WebClientService webClientService, final String host, final int port, final boolean secure) { + try { + this.baseUri = new URI(secure ? HTTPS_SCHEME : HTTP_SCHEME, null, host, port, null, null, null); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + this.webClientService = Objects.requireNonNull(webClientService, "WebClientService is required"); + this.objectMapper = new ObjectMapper(); + this.objectMapper.setAnnotationIntrospector(new JakartaXmlBindAnnotationIntrospector(objectMapper.getTypeFactory())); + } + + public NarSummariesEntity listNarSummaries() { + final URI requestUri = new StandardHttpUriBuilder() + .scheme(baseUri.getScheme()) + .host(baseUri.getHost()) + .port(baseUri.getPort()) + .addPathSegment(NIFI_API_PATH) + .addPathSegment(CONTROLLER_PATH) + .addPathSegment(NAR_MANAGER_PATH) + .addPathSegment(NARS_PATH) + .build(); + LOGGER.info("Requesting NAR summaries from {}", requestUri); + + final HttpRequestBodySpec requestBodySpec = webClientService.get() + .uri(requestUri) + .header("Accept", "application/json"); + + try (final HttpResponseEntity response = requestBodySpec.retrieve()) { + final InputStream responseBody = getResponseBody(requestUri, response); + return objectMapper.readValue(responseBody, NarSummariesEntity.class); + } catch (final WebClientServiceException | IOException e) { + throw new NarRestApiRetryableException(e.getMessage(), e); + } + } + + public InputStream downloadNar(final String identifier) { + final URI requestUri = new StandardHttpUriBuilder() + .scheme(baseUri.getScheme()) + .host(baseUri.getHost()) + .port(baseUri.getPort()) + .addPathSegment(NIFI_API_PATH) + .addPathSegment(CONTROLLER_PATH) + .addPathSegment(NAR_MANAGER_PATH) + .addPathSegment(NARS_PATH) + .addPathSegment(identifier) + .addPathSegment(NAR_DOWNLOAD_PATH) + .build(); + LOGGER.info("Downloading NAR [{}] from {}", identifier, requestUri); + + try { + final HttpResponseEntity response = webClientService.get() + .uri(requestUri) + .header("Accept", "application/octet-stream") + .retrieve(); + return getResponseBody(requestUri, response); + } catch (final WebClientServiceException e) { + throw new NarRestApiRetryableException(e.getMessage(), e); + } + } + + private InputStream getResponseBody(final URI requestUri, final HttpResponseEntity response) { + final int statusCode = response.statusCode(); + if (HttpResponseStatus.isSuccessful(statusCode)) { + return response.body(); + } else { + final String responseMessage; + try { + responseMessage = IOUtils.toString(response.body(), StandardCharsets.UTF_8); + } catch (IOException e) { + throw new NarRestApiRetryableException("Error reading response from " + requestUri + " - " + statusCode, e); + } + if (statusCode == HttpResponseStatus.CONFLICT.getCode()) { + throw new NarRestApiRetryableException("Error calling " + requestUri + " - " + statusCode + " - " + responseMessage); + } else { + throw new IllegalStateException("Error calling " + requestUri + " - " + statusCode + " - " + responseMessage); + } + } + } +} diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/nar/NarRestApiRetryableException.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/nar/NarRestApiRetryableException.java new file mode 100644 index 000000000000..83e722e2d90c --- /dev/null +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/nar/NarRestApiRetryableException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.nar; + +/** + * Thrown from {@link NarRestApiClient} to indicate an error for a request that should be retried. + */ +public class NarRestApiRetryableException extends RuntimeException { + + public NarRestApiRetryableException(final String message) { + super(message); + } + + public NarRestApiRetryableException(final String message, final Throwable cause) { + super(message, cause); + } +} diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/nar/StandardNarManager.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/nar/StandardNarManager.java index 908c3a43c261..adf2799fcac7 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/nar/StandardNarManager.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/nar/StandardNarManager.java @@ -20,19 +20,27 @@ import org.apache.nifi.bundle.Bundle; import org.apache.nifi.bundle.BundleCoordinate; import org.apache.nifi.cluster.coordination.ClusterCoordinator; +import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.web.ResourceNotFoundException; +import org.apache.nifi.web.api.dto.NarSummaryDTO; +import org.apache.nifi.web.api.entity.NarSummariesEntity; +import org.apache.nifi.web.api.entity.NarSummaryEntity; +import org.apache.nifi.web.client.api.WebClientService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; +import javax.net.ssl.SSLContext; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.time.Instant; import java.util.ArrayList; import java.util.Collection; import java.util.HexFormat; @@ -52,26 +60,37 @@ public class StandardNarManager implements NarManager, InitializingBean, Disposa private static final Logger LOGGER = LoggerFactory.getLogger(StandardNarManager.class); + private static final Duration MAX_WAIT_TIME_FOR_CLUSTER_COORDINATOR = Duration.ofSeconds(60); + private static final Duration MAX_WAIT_TIME_FOR_NARS = Duration.ofMinutes(5); + private final ClusterCoordinator clusterCoordinator; private final ExtensionManager extensionManager; private final ControllerServiceProvider controllerServiceProvider; private final NarPersistenceProvider persistenceProvider; private final NarComponentManager narComponentManager; private final NarLoader narLoader; + private final WebClientService webClientService; + private final SSLContext sslContext; private final Map narNodesById = new ConcurrentHashMap<>(); private final Map> installFuturesById = new ConcurrentHashMap<>(); private final ExecutorService installExecutorService; private final ExecutorService deleteExecutorService; - public StandardNarManager(final FlowController flowController, final ClusterCoordinator clusterCoordinator, - final NarComponentManager narComponentManager, final NarLoader narLoader) { + public StandardNarManager(final FlowController flowController, + final ClusterCoordinator clusterCoordinator, + final NarComponentManager narComponentManager, + final NarLoader narLoader, + final WebClientService webClientService, + final SSLContext sslContext) { this.clusterCoordinator = clusterCoordinator; this.extensionManager = flowController.getExtensionManager(); this.controllerServiceProvider = flowController.getControllerServiceProvider(); this.persistenceProvider = flowController.getNarPersistenceProvider(); this.narComponentManager = narComponentManager; this.narLoader = narLoader; + this.webClientService = webClientService; + this.sslContext = sslContext; this.installExecutorService = Executors.newSingleThreadExecutor(); this.deleteExecutorService = Executors.newSingleThreadExecutor(); } @@ -81,21 +100,35 @@ public StandardNarManager(final FlowController flowController, final ClusterCoor // 2. NarLoader keeps track of NARs that were missing dependencies to consider them on future loads, so this restores state that may have been lost on a restart @Override public void afterPropertiesSet() throws IOException { - final Collection narFiles = persistenceProvider.getAllNarInfo().stream() + final Collection narInfos = persistenceProvider.getAllNarInfo(); + LOGGER.info("Initializing NAR Manager, loading {} previously stored NARs", narInfos.size()); + + final Collection narFiles = narInfos.stream() .map(NarPersistenceInfo::getNarFile) - .collect(Collectors.toList()); - LOGGER.info("Initializing NAR Manager, loading {} previously stored NARs", narFiles.size()); + .collect(Collectors.toSet()); narLoader.load(narFiles); - for (final File narFile : narFiles) { + for (final NarPersistenceInfo narInfo : narInfos) { + final File narFile = narInfo.getNarFile(); try { final NarManifest manifest = NarManifest.fromFile(narFile); final BundleCoordinate coordinate = manifest.getCoordinate(); final String identifier = createIdentifier(coordinate); final NarState state = determineNarState(manifest); final String narDigest = computeNarDigest(narFile); + + final NarNode narNode = NarNode.builder() + .identifier(identifier) + .narFile(narFile) + .narFileHexDigest(narDigest) + .manifest(manifest) + .source(NarSource.valueOf(narInfo.getNarProperties().getSourceType())) + .sourceIdentifier(narInfo.getNarProperties().getSourceId()) + .state(state) + .build(); + + narNodesById.put(identifier, narNode); LOGGER.debug("Loaded NAR [{}] with state [{}] and identifier [{}]", coordinate, state, identifier); - narNodesById.put(identifier, new NarNode(identifier, narFile, narDigest, manifest, state)); } catch (final Exception e) { LOGGER.warn("Failed to load NAR Manifest for [{}]", narFile.getAbsolutePath(), e); } @@ -122,10 +155,14 @@ private void shutdownExecutor(final ExecutorService executorService, final Strin @Override public NarNode installNar(final NarInstallRequest installRequest) throws IOException { + return installNar(installRequest, true); + } + + private NarNode installNar(final NarInstallRequest installRequest, final boolean async) throws IOException { final InputStream inputStream = installRequest.getInputStream(); final File tempNarFile = persistenceProvider.createTempFile(inputStream); try { - return installNar(installRequest, tempNarFile); + return installNar(installRequest, tempNarFile, async); } finally { if (tempNarFile.exists() && !tempNarFile.delete()) { LOGGER.warn("Failed to delete temp NAR file at [{}], file must be cleaned up manually", tempNarFile.getAbsolutePath()); @@ -135,7 +172,7 @@ public NarNode installNar(final NarInstallRequest installRequest) throws IOExcep // The outer install method is not synchronized since copying the stream to the temp file make take a long time, so we // synchronize here after already having the temp file to ensure only one request is checked and submitted for installing - private synchronized NarNode installNar(final NarInstallRequest installRequest, final File tempNarFile) throws IOException { + private synchronized NarNode installNar(final NarInstallRequest installRequest, final File tempNarFile, final boolean async) throws IOException { final NarManifest manifest = getNarManifest(tempNarFile); final BundleCoordinate coordinate = manifest.getCoordinate(); @@ -158,24 +195,37 @@ private synchronized NarNode installNar(final NarInstallRequest installRequest, .build(); final NarPersistenceInfo narPersistenceInfo = persistenceProvider.saveNar(persistenceContext, tempNarFile); - final File narFile = narPersistenceInfo.getNarFile(); + final File narFile = narPersistenceInfo.getNarFile(); final String identifier = createIdentifier(coordinate); final String narDigest = computeNarDigest(narFile); - final NarNode narNode = new NarNode(identifier, narFile, narDigest, manifest, NarState.WAITING_TO_INSTALL); - narNodesById.put(identifier, narNode); - LOGGER.info("Submitting install task for NAR with id [{}] and coordinate [{}]", identifier, coordinate); + final NarNode narNode = NarNode.builder() + .identifier(identifier) + .narFile(narFile) + .narFileHexDigest(narDigest) + .manifest(manifest) + .source(installRequest.getSource()) + .sourceIdentifier(installRequest.getSourceIdentifier()) + .state(NarState.WAITING_TO_INSTALL) + .build(); + narNodesById.put(identifier, narNode); final NarInstallTask installTask = createInstallTask(narNode); - final Future installTaskFuture = installExecutorService.submit(installTask); - installFuturesById.put(identifier, installTaskFuture); - + if (async) { + LOGGER.info("Submitting install task for NAR with id [{}] and coordinate [{}]", identifier, coordinate); + final Future installTaskFuture = installExecutorService.submit(installTask); + installFuturesById.put(identifier, installTaskFuture); + } else { + LOGGER.info("Synchronously installing NAR with id [{}] and coordinate [{}]", identifier, coordinate); + installTask.run(); + } return narNode; } @Override public void completeInstall(final String identifier) { + LOGGER.info("Completed install for NAR [{}]", identifier); installFuturesById.remove(identifier); } @@ -259,6 +309,130 @@ public synchronized InputStream readNar(final String identifier) { } } + @Override + public synchronized void syncWithClusterCoordinator() { + if (clusterCoordinator == null) { + LOGGER.info("Cluster coordinator is null, will not sync NARs"); + return; + } + + // This sync method is called from the method that loads the flow from a connection response, which means there must already be a cluster coordinator to + // have gotten a response from, but during testing there were cases where calling clusterCoordinator.getElectedActiveCoordinatorNode() was still null, so + // the helper method here will keep checking for the identifier up to a certain threshold to avoid slight timing issues + final NodeIdentifier coordinatorNodeId = getElectedActiveCoordinatorNode(); + if (coordinatorNodeId == null) { + LOGGER.warn("Unable to obtain the node identifier for the cluster coordinator, will not sync NARs"); + return; + } + + LOGGER.info("Determined cluster coordinator is at {}", coordinatorNodeId); + if (clusterCoordinator.isActiveClusterCoordinator()) { + LOGGER.info("Current node is the cluster coordinator, will not sync NARs"); + return; + } + + LOGGER.info("Synchronizing NARs with cluster coordinator"); + final String coordinatorAddress = coordinatorNodeId.getApiAddress(); + final int coordinatorPort = coordinatorNodeId.getApiPort(); + final NarRestApiClient narRestApiClient = new NarRestApiClient(webClientService, coordinatorAddress, coordinatorPort, sslContext != null); + + final int localNarCountBeforeSync = narNodesById.size(); + try { + // This node may try to retrieve the summaries from the coordinator while the coordinator still hasn't finished initializing its flow controller and + // the response will be a 409, so the helper method here will catch any retryable exceptions and retry the request up to a configured threshold + final NarSummariesEntity narSummaries = getNarSummariesFromCoordinator(narRestApiClient); + if (narSummaries == null) { + LOGGER.error("Unable to retrieve listing of NARs from cluster coordinator within the maximum amount of time, will not sync NARs"); + return; + } + + LOGGER.info("Cluster coordinator returned {} NAR summaries", narSummaries.getNarSummaries().size()); + + for (final NarSummaryEntity narSummaryEntity : narSummaries.getNarSummaries()) { + final NarSummaryDTO narSummaryDTO = narSummaryEntity.getNarSummary(); + final String coordinatorNarId = narSummaryDTO.getIdentifier(); + final String coordinatorNarDigest = narSummaryDTO.getDigest(); + final NarNode matchingNar = narNodesById.get(coordinatorNarId); + if (matchingNar == null) { + LOGGER.info("Coordinator has NAR [{}] which does not exist locally, will download", coordinatorNarId); + downloadNar(narRestApiClient, narSummaryDTO); + } else if (!coordinatorNarDigest.equals(matchingNar.getNarFileHexDigest())) { + LOGGER.info("Coordinator has NAR [{}] which exists locally with a different digest, will download", coordinatorNarId); + downloadNar(narRestApiClient, narSummaryDTO); + } else { + LOGGER.info("Coordinator has NAR [{}] which exists locally with a matching digest, will not download", coordinatorNarId); + } + } + } catch (final Exception e) { + // if the current node has existing NARs and the sync fails then we throw an exception to fail start up, otherwise we don't know if the digests of the local NARs + // match with the coordinator which could result in joining the cluster and running slightly different code on one node + // if the current node has no existing NARs then we can let the node proceed and attempt to join the cluster because maybe the cluster coordinator had no NARs anyway, + // and if it did then flow synchronization will fail after this because the local flow will have ghosted components that are not ghosted in the cluster + if (localNarCountBeforeSync > 0) { + throw new RuntimeException("Failed to sync NARs from cluster coordinator", e); + } else { + LOGGER.error("Failed to sync NARs from cluster coordinator, no NARs exist locally, will proceed", e); + } + } + } + + private void downloadNar(final NarRestApiClient narRestApiClient, final NarSummaryDTO narSummary) throws IOException { + try (final InputStream coordinatorNarInputStream = narRestApiClient.downloadNar(narSummary.getIdentifier())) { + final NarInstallRequest installRequest = NarInstallRequest.builder() + .source(NarSource.valueOf(narSummary.getSourceType())) + .sourceIdentifier(narSummary.getSourceIdentifier()) + .inputStream(coordinatorNarInputStream) + .build(); + installNar(installRequest, false); + } + } + + private NarSummariesEntity getNarSummariesFromCoordinator(final NarRestApiClient narRestApiClient) { + final Instant waitUntilInstant = Instant.ofEpochMilli(System.currentTimeMillis() + MAX_WAIT_TIME_FOR_NARS.toMillis()); + while (System.currentTimeMillis() < waitUntilInstant.toEpochMilli()) { + final NarSummariesEntity narSummaries = listNarSummaries(narRestApiClient); + if (narSummaries != null) { + return narSummaries; + } + LOGGER.info("Unable to retrieve NAR summaries from cluster coordinator, will retry until [{}]", waitUntilInstant); + sleep(Duration.ofSeconds(5)); + } + return null; + } + + private NarSummariesEntity listNarSummaries(final NarRestApiClient narRestApiClient) { + try { + return narRestApiClient.listNarSummaries(); + } catch (final NarRestApiRetryableException e) { + LOGGER.warn("[{}], will retry", e.getMessage()); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("", e); + } + return null; + } + } + + private NodeIdentifier getElectedActiveCoordinatorNode() { + final Instant waitUntilInstant = Instant.ofEpochMilli(System.currentTimeMillis() + MAX_WAIT_TIME_FOR_CLUSTER_COORDINATOR.toMillis()); + while (System.currentTimeMillis() < waitUntilInstant.toEpochMilli()) { + final NodeIdentifier coordinatorNodeId = clusterCoordinator.getElectedActiveCoordinatorNode(); + if (coordinatorNodeId != null) { + return coordinatorNodeId; + } + LOGGER.info("Node identifier for the active cluster coordinator is not known yet, will retry until [{}]", waitUntilInstant); + sleep(Duration.ofSeconds(2)); + } + return null; + } + + private void sleep(final Duration duration) { + try { + Thread.sleep(duration); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + private NarNode getNarNodeOrThrowNotFound(final String identifier) { final NarNode narNode = narNodesById.get(identifier); if (narNode == null) { diff --git a/nifi-framework-bundle/nifi-framework/nifi-headless-server/src/main/java/org/apache/nifi/headless/HeadlessNiFiServer.java b/nifi-framework-bundle/nifi-framework/nifi-headless-server/src/main/java/org/apache/nifi/headless/HeadlessNiFiServer.java index 52b5090b0173..b4e93ed218f9 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-headless-server/src/main/java/org/apache/nifi/headless/HeadlessNiFiServer.java +++ b/nifi-framework-bundle/nifi-framework/nifi-headless-server/src/main/java/org/apache/nifi/headless/HeadlessNiFiServer.java @@ -159,6 +159,7 @@ public void preDestruction() throws AuthorizerDestructionException { flowController, props, null, // revision manager + null, // NAR Manager authorizer); diagnosticsFactory = new BootstrapDiagnosticsFactory(); diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java index 3aa31433f982..277ffdd1fae6 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java @@ -2478,6 +2478,7 @@ public Response uploadNar( } } + final long startTime = System.currentTimeMillis(); final InputStream maxLengthInputStream = new MaxLengthInputStream(inputStream, (long) DataUnit.GB.toB(1)); if (isReplicateRequest()) { @@ -2494,6 +2495,11 @@ public Response uploadNar( } final NarSummaryEntity summaryEntity = serviceFacade.uploadNar(maxLengthInputStream); + final NarSummaryDTO summary = summaryEntity.getNarSummary(); + + final long elapsedTime = System.currentTimeMillis() - startTime; + LOGGER.info("Upload completed for NAR [{}] in {} ms", summary.getIdentifier(), elapsedTime); + return generateOkResponse(summaryEntity).build(); } diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index 4bc9e947da6b..992eaf8e4250 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -5055,6 +5055,8 @@ public NarSummaryDTO createNarSummaryDto(final NarNode narNode) { dto.setBuildTime(narManifest.getBuildTimestamp()); dto.setCreatedBy(narManifest.getCreatedBy()); dto.setDigest(narNode.getNarFileHexDigest()); + dto.setSourceType(narNode.getSource().name()); + dto.setSourceIdentifier(narNode.getSourceIdentifier()); dto.setState(narNode.getState().getValue()); dto.setFailureMessage(narNode.getFailureMessage()); diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/dto/DtoFactoryTest.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/dto/DtoFactoryTest.java index c82c43bac216..84a31839344a 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/dto/DtoFactoryTest.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/dto/DtoFactoryTest.java @@ -23,6 +23,7 @@ import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.nar.NarManifest; import org.apache.nifi.nar.NarNode; +import org.apache.nifi.nar.NarSource; import org.apache.nifi.nar.NarState; import org.apache.nifi.nar.StandardExtensionDiscoveringManager; import org.apache.nifi.nar.SystemBundle; @@ -128,7 +129,15 @@ public void testCreateNarSummaryDtoWhenInstalled() { .createdBy("Maven NAR Plugin") .build(); - final NarNode narNode = new NarNode(UUID.randomUUID().toString(), new File("does-not-ext"), "nar-digest", narManifest, NarState.INSTALLED); + final NarNode narNode = NarNode.builder() + .identifier(UUID.randomUUID().toString()) + .narFile(new File("does-not-ext")) + .narFileHexDigest("nar-digest") + .manifest(narManifest) + .source(NarSource.UPLOAD) + .sourceIdentifier("1234") + .state(NarState.INSTALLED) + .build(); final DtoFactory dtoFactory = new DtoFactory(); final NarSummaryDTO summaryDTO = dtoFactory.createNarSummaryDto(narNode); @@ -137,6 +146,8 @@ public void testCreateNarSummaryDtoWhenInstalled() { assertEquals(narManifest.getCreatedBy(), summaryDTO.getCreatedBy()); assertEquals(narNode.getNarFileHexDigest(), summaryDTO.getDigest()); assertEquals(narNode.getState().getValue(), summaryDTO.getState()); + assertEquals(narNode.getSource().name(), summaryDTO.getSourceType()); + assertEquals(narNode.getSourceIdentifier(), summaryDTO.getSourceIdentifier()); assertTrue(summaryDTO.isInstallComplete()); assertNull(summaryDTO.getFailureMessage()); assertNull(summaryDTO.getDependencyCoordinate()); @@ -158,7 +169,15 @@ public void testCreateNarSummaryDtoWhenInstalling() { .createdBy("Maven NAR Plugin") .build(); - final NarNode narNode = new NarNode(UUID.randomUUID().toString(), new File("does-not-ext"), "nar-digest", narManifest, NarState.INSTALLING); + final NarNode narNode = NarNode.builder() + .identifier(UUID.randomUUID().toString()) + .narFile(new File("does-not-ext")) + .narFileHexDigest("nar-digest") + .manifest(narManifest) + .source(NarSource.UPLOAD) + .sourceIdentifier("1234") + .state(NarState.INSTALLING) + .build(); final DtoFactory dtoFactory = new DtoFactory(); final NarSummaryDTO summaryDTO = dtoFactory.createNarSummaryDto(narNode); @@ -167,6 +186,8 @@ public void testCreateNarSummaryDtoWhenInstalling() { assertEquals(narManifest.getCreatedBy(), summaryDTO.getCreatedBy()); assertEquals(narNode.getNarFileHexDigest(), summaryDTO.getDigest()); assertEquals(narNode.getState().getValue(), summaryDTO.getState()); + assertEquals(narNode.getSource().name(), summaryDTO.getSourceType()); + assertEquals(narNode.getSourceIdentifier(), summaryDTO.getSourceIdentifier()); assertFalse(summaryDTO.isInstallComplete()); assertNull(summaryDTO.getFailureMessage());