diff --git a/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/NarUploadDTO.java b/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/NarUploadDTO.java new file mode 100644 index 0000000000000..e77ceec6a00cb --- /dev/null +++ b/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/NarUploadDTO.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.api.dto; + +import jakarta.xml.bind.annotation.XmlType; + +@XmlType(name = "narUpload") +public class NarUploadDTO { + + +} diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardUploadRequestReplicator.java b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardUploadRequestReplicator.java index a8d54ac4bb01c..1faa76a6b9ad2 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardUploadRequestReplicator.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardUploadRequestReplicator.java @@ -239,6 +239,7 @@ private T replicateRequest(final NodeIdentifier nodeId, final UploadRequest< final NiFiUser user = uploadRequest.getUser(); final String filename = uploadRequest.getFilename(); + final String uploadId = uploadRequest.getIdentifier(); try (final InputStream inputStream = new FileInputStream(contents)) { final HttpRequestBodySpec request = webClientService.post() @@ -246,6 +247,7 @@ private T replicateRequest(final NodeIdentifier nodeId, final UploadRequest< .body(inputStream, OptionalLong.of(inputStream.available())) .header(CONTENT_TYPE_HEADER, UPLOAD_CONTENT_TYPE) .header(FILENAME_HEADER, filename) + .header(UPLOAD_ID, uploadId) // Special NiFi-specific headers to indicate that the request should be performed and not replicated to the nodes .header(RequestReplicator.REQUEST_EXECUTION_HTTP_HEADER, "true") .header(RequestReplicator.REPLICATION_INDICATOR_HEADER, "true") diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/UploadRequest.java b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/UploadRequest.java index 66e4d73098606..e2fd524642f9d 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/UploadRequest.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/UploadRequest.java @@ -32,6 +32,7 @@ public class UploadRequest { private final NiFiUser user; private final String filename; + private final String identifier; private final InputStream contents; private final URI exampleRequestUri; private final Class responseClass; @@ -39,6 +40,7 @@ public class UploadRequest { private UploadRequest(final Builder builder) { this.user = Objects.requireNonNull(builder.user); this.filename = Objects.requireNonNull(builder.filename); + this.identifier = Objects.requireNonNull(builder.identifier); this.contents = Objects.requireNonNull(builder.contents); this.exampleRequestUri = Objects.requireNonNull(builder.exampleRequestUri); this.responseClass = Objects.requireNonNull(builder.responseClass); @@ -52,6 +54,10 @@ public String getFilename() { return filename; } + public String getIdentifier() { + return identifier; + } + public InputStream getContents() { return contents; } @@ -67,6 +73,7 @@ public Class getResponseClass() { public static final class Builder { private NiFiUser user; private String filename; + private String identifier; private InputStream contents; private URI exampleRequestUri; private Class responseClass; @@ -81,6 +88,11 @@ public Builder filename(String filename) { return this; } + public Builder identifier(String identifier) { + this.identifier = identifier; + return this; + } + public Builder contents(InputStream contents) { this.contents = contents; return this; diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/UploadRequestReplicator.java b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/UploadRequestReplicator.java index c90f61fc87210..2bc355046e333 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/UploadRequestReplicator.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/UploadRequestReplicator.java @@ -25,6 +25,7 @@ public interface UploadRequestReplicator { String FILENAME_HEADER = "Filename"; + String UPLOAD_ID = "Upload-Id"; String CONTENT_TYPE_HEADER = "Content-Type"; String UPLOAD_CONTENT_TYPE = "application/octet-stream"; diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/nar/NarManager.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/nar/NarManager.java index d1a497ae62a52..1b59ae1dba5f4 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/nar/NarManager.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/nar/NarManager.java @@ -24,7 +24,11 @@ public interface NarManager { - BundleCoordinate addNar(InputStream inputStream) throws IOException; + NarUpload uploadNar(String uploadId, InputStream inputStream) throws IOException; + + NarUpload getUpload(String uploadId); + + NarUpload deleteUpload(String uploadId); void verifyDeleteNar(BundleCoordinate coordinate, boolean forceDelete); diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/nar/NarState.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/nar/NarState.java new file mode 100644 index 0000000000000..c68634beb69be --- /dev/null +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/nar/NarState.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.nar; + +public enum NarState { + LOADED, + MISSING_DEPENDENCY +} diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/nar/NarUpload.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/nar/NarUpload.java new file mode 100644 index 0000000000000..6832ac80dcf5d --- /dev/null +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/nar/NarUpload.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.nar; + +public class NarUpload { + + private final String identifier; + private final NarManifest narManifest; + + private NarUploadStatus uploadStatus; + private NarState narState; + + public NarUpload(final String identifier, final NarManifest narManifest) { + this.identifier = identifier; + this.narManifest = narManifest; + this.uploadStatus = NarUploadStatus.UPLOADING; + } + + public NarUpload(final NarUpload other) { + this.identifier = other.identifier; + this.narManifest = other.narManifest; + this.uploadStatus = other.uploadStatus; + this.narState = other.narState; + } + + public String getIdentifier() { + return identifier; + } + + public NarManifest getNarManifest() { + return narManifest; + } + + public NarUploadStatus getUploadStatus() { + return uploadStatus; + } + + public void setUploadStatus(final NarUploadStatus uploadStatus) { + this.uploadStatus = uploadStatus; + } + + public NarState getNarState() { + return narState; + } + + public void setNarState(final NarState narState) { + this.narState = narState; + } + +} diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/nar/NarUploadStatus.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/nar/NarUploadStatus.java new file mode 100644 index 0000000000000..0036cc86bf04d --- /dev/null +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/nar/NarUploadStatus.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.nar; + +public enum NarUploadStatus { + UPLOADING, + LOADING_NAR, + RELOADING_COMPONENTS, + COMPLETED +} diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/nar/StandardNarManager.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/nar/StandardNarManager.java index a141d0b54ccd2..c15d99888744c 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/nar/StandardNarManager.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/nar/StandardNarManager.java @@ -17,6 +17,7 @@ package org.apache.nifi.nar; +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.bundle.Bundle; import org.apache.nifi.bundle.BundleCoordinate; import org.apache.nifi.controller.ComponentNode; @@ -38,8 +39,10 @@ import org.apache.nifi.registry.flow.FlowRegistryClient; import org.apache.nifi.registry.flow.FlowRegistryClientNode; import org.apache.nifi.reporting.ReportingTask; +import org.apache.nifi.web.ResourceNotFoundException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; import java.io.File; @@ -54,13 +57,18 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; -public class StandardNarManager implements NarManager, InitializingBean { +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +public class StandardNarManager implements NarManager, InitializingBean, DisposableBean { private static final Logger LOGGER = LoggerFactory.getLogger(StandardNarManager.class); @@ -81,6 +89,9 @@ public class StandardNarManager implements NarManager, InitializingBean { private final NarPersistenceProvider persistenceProvider; private final NarLoader narLoader; + private final Map uploadRequests = new ConcurrentHashMap<>(); + private final ExecutorService uploadExecutorService; + public StandardNarManager(final FlowController flowController, final NarLoader narLoader) { this.flowManager = flowController.getFlowManager(); this.extensionManager = flowController.getExtensionManager(); @@ -88,6 +99,7 @@ public StandardNarManager(final FlowController flowController, final NarLoader n this.controllerServiceProvider = flowController.getControllerServiceProvider(); this.persistenceProvider = flowController.getNarPersistenceProvider(); this.narLoader = narLoader; + this.uploadExecutorService = Executors.newSingleThreadExecutor(); } // This serves two purposes... @@ -103,19 +115,33 @@ public void afterPropertiesSet() throws IOException { } @Override - public synchronized BundleCoordinate addNar(final InputStream inputStream) throws IOException { + public void destroy() throws Exception { + uploadExecutorService.shutdown(); + try { + if (!uploadExecutorService.awaitTermination(5000, MILLISECONDS)) { + uploadExecutorService.shutdownNow(); + } + } catch (InterruptedException ignore) { + LOGGER.info("Forcing shutdown of NAR Manager Upload Executor Service"); + uploadExecutorService.shutdownNow(); + } + } + + @Override + public synchronized NarUpload uploadNar(final String uploadId, final InputStream inputStream) throws IOException { + if (uploadRequests.containsKey(uploadId)) { + throw new IllegalArgumentException("A NAR Upload already exists with the given upload id"); + } + final File tempNarFile = persistenceProvider.createTempFile(inputStream); try { final NarManifest manifest = getNarManifest(tempNarFile); final BundleCoordinate coordinate = manifest.getCoordinate(); final Bundle existingBundle = extensionManager.getBundle(coordinate); - final boolean previouslyExistedInExtensionManager = existingBundle != null; - final boolean previouslyExistedInPersistenceProvider = persistenceProvider.exists(coordinate); - - if (previouslyExistedInExtensionManager && !previouslyExistedInPersistenceProvider) { + if (existingBundle != null && !persistenceProvider.exists(coordinate)) { deleteFileQuietly(tempNarFile); - throw new IllegalStateException("Another NAR is registered with the same coordinate and can not be replaced because it is not part of the NAR Manager"); + throw new IllegalStateException("A NAR is already registered with the same coordinate and can not be replaced because it is not part of the NAR Manager"); } final NarPersistenceContext persistenceContext = NarPersistenceContext.builder() @@ -125,32 +151,73 @@ public synchronized BundleCoordinate addNar(final InputStream inputStream) throw .build(); final NarPersistenceInfo narPersistenceInfo = persistenceProvider.saveNar(persistenceContext, tempNarFile); - final File narFile = narPersistenceInfo.getNarFile(); - final StoppedComponents stoppedComponents = new StoppedComponents(controllerServiceProvider); - if (previouslyExistedInExtensionManager) { - LOGGER.info("Unloading NAR and components for coordinate [{}] in order to replace NAR", coordinate); - narLoader.unload(existingBundle); - unloadComponents(coordinate, stoppedComponents); + final NarUpload narUpload = new NarUpload(uploadId, manifest); + uploadExecutorService.submit(() -> loadNar(narUpload, narPersistenceInfo, existingBundle)); + uploadRequests.put(uploadId, narUpload); + return narUpload; + } finally { + if (tempNarFile.exists() && !tempNarFile.delete()) { + LOGGER.warn("Failed to delete temp NAR file at [{}], file must be cleaned up manually", tempNarFile.getAbsolutePath()); } + } + } - // Load the NAR and attempt to un-ghost any components that can be provided by one of the loaded NARs, this handles a general ghosting case where - // the NAR now becomes available, as well as restoring any component that may have been purposely unloaded above for replacing an existing NAR - final NarLoadResult narLoadResult = narLoader.load(Collections.singleton(narFile), ALLOWED_EXTENSION_TYPES); - for (final Bundle loadedBundle : narLoadResult.getLoadedBundles()) { - final BundleCoordinate loadedCoordinate = loadedBundle.getBundleDetails().getCoordinate(); - loadMissingComponents(loadedCoordinate, stoppedComponents); - } + private void loadNar(final NarUpload narUpload, final NarPersistenceInfo narPersistenceInfo, final Bundle existingBundle) { + final File narFile = narPersistenceInfo.getNarFile(); + final BundleCoordinate coordinate = narPersistenceInfo.getNarProperties().getCoordinate(); + narUpload.setUploadStatus(NarUploadStatus.LOADING_NAR); - // Restore previously running/enabled components to their original state - stoppedComponents.startAll(); + // If replacing an existing NAR with the same coordinate, then unload the existing NAR and stop+ghost any components from it + final StoppedComponents stoppedComponents = new StoppedComponents(controllerServiceProvider); + if (existingBundle != null) { + LOGGER.info("Replacing NAR [{}], unloading existing NAR and components", coordinate); + narLoader.unload(existingBundle); + unloadComponents(coordinate, stoppedComponents); + } - return coordinate; - } finally { - if (tempNarFile.exists() && !tempNarFile.delete()) { - LOGGER.warn("Failed to delete temp NAR file at [{}], file must be cleaned up manually", tempNarFile.getAbsolutePath()); + // Attempt to load the NAR which will include any NARs that were previously skipped + final NarLoadResult narLoadResult = narLoader.load(Collections.singleton(narFile), ALLOWED_EXTENSION_TYPES); + narUpload.setUploadStatus(NarUploadStatus.RELOADING_COMPONENTS); + + // For any successfully loaded NARs, un-ghost any components that can be provided by one of the loaded NARs, this handles a general ghosting case where + // the NAR now becomes available, as well as restoring any component that may have been purposely unloaded above for replacing an existing NAR + for (final Bundle loadedBundle : narLoadResult.getLoadedBundles()) { + final BundleCoordinate loadedCoordinate = loadedBundle.getBundleDetails().getCoordinate(); + if (loadedCoordinate.equals(coordinate)) { + narUpload.setNarState(NarState.LOADED); } + loadMissingComponents(loadedCoordinate, stoppedComponents); + } + + // If NAR state is still null after looping over the loaded bundles, then the current NAR was skipped due to a missing dependency + if (narUpload.getNarState() == null) { + narUpload.setNarState(NarState.MISSING_DEPENDENCY); + } + + // Restore previously running/enabled components to their original state + stoppedComponents.startAll(); + + narUpload.setUploadStatus(NarUploadStatus.COMPLETED); + } + + @Override + public NarUpload getUpload(final String uploadId) { + final NarUpload uploadRequest = uploadRequests.get(uploadId); + if (uploadRequest == null) { + throw new ResourceNotFoundException("NAR Upload Request does not exist with the given id"); + } + return new NarUpload(uploadRequest); + } + + @Override + public NarUpload deleteUpload(final String uploadId) { + final NarUpload uploadRequest = uploadRequests.remove(uploadId); + if (uploadRequest == null) { + throw new ResourceNotFoundException("NAR Upload Request does not exist with the given id"); } + // TODO should we prevent deleting when not in COMPLETED state? + return new NarUpload(uploadRequest); } @Override diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java index 23dea07ca42bb..13f779d373011 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java @@ -2840,7 +2840,8 @@ ControllerServiceReferencingComponentsEntity updateControllerServiceReferencingC // NAR Management methods // ---------------------------------------- - BundleEntity addNar(InputStream inputStream) throws IOException; + // Note - "upload" is purposely not part of the AOP write lock since an upload may take a significant amount of time and should not block all other operations + BundleEntity uploadNar(String uploadId, InputStream inputStream) throws IOException; void verifyDeleteNar(BundleCoordinate coordinate, boolean forceDelete); diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index 50c888e167e7e..5c79d6ef768d2 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -128,6 +128,7 @@ import org.apache.nifi.metrics.jvm.JmxJvmMetrics; import org.apache.nifi.nar.ExtensionManager; import org.apache.nifi.nar.NarManager; +import org.apache.nifi.nar.NarUpload; import org.apache.nifi.parameter.Parameter; import org.apache.nifi.parameter.ParameterContext; import org.apache.nifi.parameter.ParameterContextLookup; @@ -6602,9 +6603,10 @@ public FlowAnalysisResultEntity createFlowAnalysisResultEntity(Collection uploadRequest = new UploadRequest.Builder() .user(NiFiUserUtils.getNiFiUser()) .filename(filename) + .identifier(UUID.randomUUID().toString()) .contents(inputStream) .exampleRequestUri(getAbsolutePath()) .responseClass(BundleEntity.class) @@ -2486,7 +2490,11 @@ public Response uploadNar( return generateOkResponse(bundleEntity).build(); } - final BundleEntity bundleEntity = serviceFacade.addNar(inputStream); + if (StringUtils.isBlank(uploadIdentifier)) { + throw new IllegalArgumentException(UploadRequestReplicator.UPLOAD_ID + " header is required"); + } + + final BundleEntity bundleEntity = serviceFacade.uploadNar(uploadIdentifier, inputStream); return generateOkResponse(bundleEntity).build(); }