Skip to content

Commit

Permalink
Checkpoint - refactoring save NAR
Browse files Browse the repository at this point in the history
  • Loading branch information
bbende committed May 30, 2024
1 parent dc89c6a commit 983fc41
Show file tree
Hide file tree
Showing 9 changed files with 350 additions and 49 deletions.
40 changes: 40 additions & 0 deletions nifi-framework-api/src/main/java/org/apache/nifi/nar/NarFile.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.nar;

import java.io.File;
import java.util.Objects;

public class NarFile {

private final File file;
private final NarManifest manifest;

public NarFile(final File file, final NarManifest manifest) {
this.file = Objects.requireNonNull(file, "File is required");
this.manifest = Objects.requireNonNull(manifest, "Manifest is required");
}

public File getFile() {
return file;
}

public NarManifest getManifest() {
return manifest;
}
}
225 changes: 225 additions & 0 deletions nifi-framework-api/src/main/java/org/apache/nifi/nar/NarManifest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.nar;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Objects;
import java.util.jar.Attributes;
import java.util.jar.JarInputStream;
import java.util.jar.Manifest;

public class NarManifest {

private final String group;
private final String id;
private final String version;

private final String dependencyGroup;
private final String dependencyId;
private final String dependencyVersion;

private final String buildTag;
private final String buildRevision;
private final String buildBranch;
private final String buildTimestamp;
private final String buildJdk;
private final String builtBy;

private NarManifest(final Builder builder) {
this.group = Objects.requireNonNull(builder.group);
this.id = Objects.requireNonNull(builder.id);
this.version = Objects.requireNonNull(builder.version);

this.dependencyGroup = builder.dependencyGroup;
this.dependencyId = builder.dependencyId;
this.dependencyVersion = builder.dependencyVersion;

this.buildTag = builder.buildTag;
this.buildRevision = builder.buildRevision;
this.buildBranch = builder.buildBranch;
this.buildTimestamp = builder.buildTimestamp;
this.buildJdk = builder.buildJdk;
this.builtBy = builder.builtBy;
}

public String getGroup() {
return group;
}

public String getId() {
return id;
}

public String getVersion() {
return version;
}

public String getDependencyGroup() {
return dependencyGroup;
}

public String getDependencyId() {
return dependencyId;
}

public String getDependencyVersion() {
return dependencyVersion;
}

public String getBuildTag() {
return buildTag;
}

public String getBuildRevision() {
return buildRevision;
}

public String getBuildBranch() {
return buildBranch;
}

public String getBuildTimestamp() {
return buildTimestamp;
}

public String getBuildJdk() {
return buildJdk;
}

public String getBuiltBy() {
return builtBy;
}

public static NarManifest fromFile(final File narFile) throws IOException {
try (final InputStream inputStream = new FileInputStream(narFile)) {
return fromInputStream(inputStream);
}
}

public static NarManifest fromInputStream(final InputStream inputStream) throws IOException {
try (final JarInputStream jarInputStream = new JarInputStream(inputStream)) {
final Manifest manifest = jarInputStream.getManifest();
if (manifest == null) {
throw new IllegalStateException("NAR content is missing required META-INF/MANIFEST entry");
}

final Attributes attributes = manifest.getMainAttributes();

return NarManifest.builder()
.group(attributes.getValue(NarManifestEntry.NAR_GROUP.getManifestName()))
.id(attributes.getValue(NarManifestEntry.NAR_ID.getManifestName()))
.version(attributes.getValue(NarManifestEntry.NAR_VERSION.getManifestName()))
.dependencyGroup(attributes.getValue(NarManifestEntry.NAR_DEPENDENCY_GROUP.getManifestName()))
.dependencyId(attributes.getValue(NarManifestEntry.NAR_DEPENDENCY_ID.getManifestName()))
.dependencyVersion(attributes.getValue(NarManifestEntry.NAR_DEPENDENCY_VERSION.getManifestName()))
.buildTag(attributes.getValue(NarManifestEntry.BUILD_TAG.getManifestName()))
.buildRevision(attributes.getValue(NarManifestEntry.BUILD_REVISION.getManifestName()))
.buildBranch(attributes.getValue(NarManifestEntry.BUILD_BRANCH.getManifestName()))
.buildTimestamp(attributes.getValue(NarManifestEntry.BUILD_TIMESTAMP.getManifestName()))
.buildJdk(attributes.getValue(NarManifestEntry.BUILD_JDK.getManifestName()))
.builtBy(attributes.getValue(NarManifestEntry.BUILT_BY.getManifestName()))
.build();
}
}

public static Builder builder() {
return new Builder();
}

public static final class Builder {
private String group;
private String id;
private String version;
private String dependencyGroup;
private String dependencyId;
private String dependencyVersion;
private String buildTag;
private String buildRevision;
private String buildBranch;
private String buildTimestamp;
private String buildJdk;
private String builtBy;

public Builder group(final String group) {
this.group = group;
return this;
}

public Builder id(final String id) {
this.id = id;
return this;
}

public Builder version(final String version) {
this.version = version;
return this;
}

public Builder dependencyGroup(final String dependencyGroup) {
this.dependencyGroup = dependencyGroup;
return this;
}

public Builder dependencyId(final String dependencyId) {
this.dependencyId = dependencyId;
return this;
}

public Builder dependencyVersion(final String dependencyVersion) {
this.dependencyVersion = dependencyVersion;
return this;
}

public Builder buildTag(final String buildTag) {
this.buildTag = buildTag;
return this;
}

public Builder buildRevision(final String buildRevision) {
this.buildRevision = buildRevision;
return this;
}

public Builder buildBranch(final String buildBranch) {
this.buildBranch = buildBranch;
return this;
}

public Builder buildTimestamp(final String buildTimestamp) {
this.buildTimestamp = buildTimestamp;
return this;
}

public Builder buildJdk(final String buildJdk) {
this.buildJdk = buildJdk;
return this;
}

public Builder builtBy(final String builtBy) {
this.builtBy = builtBy;
return this;
}

public NarManifest build() {
return new NarManifest(this);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,15 @@ public interface NarPersistenceProvider {
*/
void initialize(NarPersistenceProviderInitializationContext initializationContext);

/**
* Called prior to NiFi shutting down.
*/
void shutdown();

/**
* Saves the contents of a NAR contained in the given input stream.
*
* @param narCoordinate the coordinate that was extracted from the NAR input stream
* @param inputStream the input stream containing the contents of the NAR
* @return the File where the NAR content was stored
*
* @throws IOException if an error occurs writing the NAR contents to the filesystem
*/
File saveNar(BundleCoordinate narCoordinate, InputStream inputStream) throws IOException;
NarFile saveNar(InputStream inputStream) throws IOException;

/**
* Deletes the contents of the NAR with the given coordinate.
Expand Down Expand Up @@ -86,4 +80,9 @@ public interface NarPersistenceProvider {
*/
Map<BundleCoordinate, File> getNarFiles();

/**
* Called prior to NiFi shutting down.
*/
void shutdown();

}
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@
import org.apache.nifi.nar.ExtensionDiscoveringManager;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.nar.NarFile;
import org.apache.nifi.nar.NarPersistenceProvider;
import org.apache.nifi.nar.NarPersistenceProviderInitializationContext;
import org.apache.nifi.nar.NarThreadContextClassLoader;
Expand Down Expand Up @@ -1425,9 +1426,9 @@ public void shutdown() {
}

@Override
public File saveNar(final BundleCoordinate narCoordinate, final InputStream inputStream) throws IOException {
public NarFile saveNar(final InputStream inputStream) throws IOException {
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(originalClassLoader)) {
return originalInstance.saveNar(narCoordinate, inputStream);
return originalInstance.saveNar(inputStream);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@
import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.controller.ComponentNode;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.util.FileUtils;
import org.apache.nifi.util.Tuple;
import org.apache.nifi.flowanalysis.FlowAnalysisRule;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.reporting.ReportingTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
Expand All @@ -50,6 +53,14 @@ public class StandardNarManager implements NarManager, InitializingBean {
private static final String TEMP_FILE_NIFI_PREFIX = "nifi";
private static final String TEMP_FILE_NAR_SUFFIX = "nar";

private static final Set<Class<?>> ALLOWED_EXTENSION_TYPES = Set.of(
Processor.class,
ControllerService.class,
ReportingTask.class,
FlowRegistryClient.class,
FlowAnalysisRule.class
);

private final FlowManager flowManager;
private final ExtensionManager extensionManager;
private final NarPersistenceProvider persistenceProvider;
Expand All @@ -74,29 +85,20 @@ public void afterPropertiesSet() {

@Override
public synchronized BundleCoordinate addNar(final String filename, final InputStream inputStream) throws IOException {
final File tempFile = Files.createTempFile(TEMP_FILE_NIFI_PREFIX, TEMP_FILE_NAR_SUFFIX).toFile();
LOGGER.debug("Created temporary file {} to hold contents of NAR for {}", tempFile.getAbsolutePath(), filename);
try {
Files.copy(inputStream, tempFile.toPath(), StandardCopyOption.REPLACE_EXISTING);

final BundleCoordinate coordinate = NarBundleUtil.extractCoordinate(tempFile);
if (extensionManager.getBundle(coordinate) != null && !persistenceProvider.exists(coordinate)) {
throw new IllegalStateException("Another NAR is registered with the same coordinate and can not be replaced because it is not part of the NAR Manager");
}
try (final InputStream tempFileInputStream = new FileInputStream(tempFile)) {
final File narFile = persistenceProvider.saveNar(coordinate, tempFileInputStream);
//TODO limit loading to only (processor, CS, reporting task, registry client, flow analysis rule)
narLoader.load(Collections.singleton(narFile));
}
return coordinate;
} finally {
final boolean successfulDelete = tempFile.delete();
if (successfulDelete) {
LOGGER.debug("Deleted temporary file {} that was created to hold contents of NAR {}", tempFile.getAbsolutePath(), filename);
} else {
LOGGER.warn("Failed to delete temporary file {}. This file should be cleaned up manually", tempFile.getAbsolutePath());
final NarFile narFile = persistenceProvider.saveNar(inputStream);
final NarManifest manifest = narFile.getManifest();
final BundleCoordinate coordinate = new BundleCoordinate(manifest.getGroup(), manifest.getId(), manifest.getVersion());
if (extensionManager.getBundle(coordinate) != null && !persistenceProvider.exists(coordinate)) {
try {
persistenceProvider.deleteNar(coordinate);
} catch (final Exception e) {
LOGGER.warn("Failed to delete NAR file [{}]", coordinate, e);
}
throw new IllegalStateException("Another NAR is registered with the same coordinate and can not be replaced because it is not part of the NAR Manager");
}

narLoader.load(Collections.singleton(narFile.getFile()), ALLOWED_EXTENSION_TYPES);
return coordinate;
}

@Override
Expand Down
Loading

0 comments on commit 983fc41

Please sign in to comment.