Skip to content

Commit

Permalink
NAR Manager - Initial setup of API, NiFi Properties, creation in Flow…
Browse files Browse the repository at this point in the history
…Controller, and Spring config
  • Loading branch information
bbende committed May 15, 2024
1 parent ea48d22 commit afefde8
Show file tree
Hide file tree
Showing 20 changed files with 394 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,10 @@ public class NiFiProperties extends ApplicationProperties {
// kubernetes properties
public static final String CLUSTER_LEADER_ELECTION_KUBERNETES_LEASE_PREFIX = "nifi.cluster.leader.election.kubernetes.lease.prefix";

// nar manager properties
public static final String NAR_MANAGER_IMPLEMENTATION_CLASS = "nifi.nar.manager.implementation";
public static final String NAR_MANAGER_PROPERTIES_PREFIX = "nifi.nar.manager.properties.";

public static final String DEFAULT_PYTHON_WORKING_DIRECTORY = "./work/python";

// automatic diagnostic defaults
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.nar;

import org.apache.nifi.bundle.BundleCoordinate;

import java.io.IOException;
import java.io.InputStream;
import java.util.Set;

public interface NarManager {

void initialize(NarManagerInitializationContext initializationContext) throws IOException;

void shutdown();

BundleCoordinate addNar(String filename, InputStream inputStream) throws IOException;

Set<BundleCoordinate> getNars();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.nar;

public class NarManagerException extends RuntimeException {

public NarManagerException(final String message) {
super(message);
}

public NarManagerException(final String message, final Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.nar;

import java.util.Map;

public interface NarManagerInitializationContext {

/**
* @return the properties that have been configured for the NarManager
*/
Map<String, String> getProperties();

}
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,12 @@
import org.apache.nifi.nar.ExtensionDiscoveringManager;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.nar.NarManager;
import org.apache.nifi.nar.NarManagerInitializationContext;
import org.apache.nifi.nar.NarThreadContextClassLoader;
import org.apache.nifi.nar.PythonBundle;
import org.apache.nifi.nar.StandardNarManager;
import org.apache.nifi.nar.StandardNarManagerInitializationContext;
import org.apache.nifi.parameter.ParameterContextManager;
import org.apache.nifi.parameter.ParameterLookup;
import org.apache.nifi.parameter.ParameterProvider;
Expand Down Expand Up @@ -208,6 +212,7 @@
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.ReflectionUtils;
import org.apache.nifi.util.Tuple;
import org.apache.nifi.util.concurrency.TimedLock;
import org.apache.nifi.validation.RuleViolationsManager;
import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
Expand Down Expand Up @@ -261,6 +266,7 @@ public class FlowController implements ReportingTaskProvider, FlowAnalysisRulePr
public static final String DEFAULT_CONTENT_REPO_IMPLEMENTATION = "org.apache.nifi.controller.repository.FileSystemRepository";
public static final String DEFAULT_PROVENANCE_REPO_IMPLEMENTATION = "org.apache.nifi.provenance.VolatileProvenanceRepository";
public static final String DEFAULT_SWAP_MANAGER_IMPLEMENTATION = "org.apache.nifi.controller.FileSystemSwapManager";
public static final String DEFAULT_NAR_MANAGER_IMPLEMENTATION = StandardNarManager.class.getName();

private static final String ENCRYPTED_PROVENANCE_REPO_IMPLEMENTATION = "org.apache.nifi.provenance.EncryptedWriteAheadProvenanceRepository";
private static final String ENCRYPTED_CONTENT_REPO_IMPLEMENTATION = "org.apache.nifi.controller.repository.crypto.EncryptedFileSystemRepository";
Expand Down Expand Up @@ -297,6 +303,7 @@ public class FlowController implements ReportingTaskProvider, FlowAnalysisRulePr
private final StateManagerProvider stateManagerProvider;
private final long systemStartTime = System.currentTimeMillis(); // time at which the node was started
private final RevisionManager revisionManager;
private final NarManager narManager;

private final ConnectionLoadBalanceServer loadBalanceServer;
private final NioAsyncLoadBalanceClientRegistry loadBalanceClientRegistry;
Expand Down Expand Up @@ -546,6 +553,7 @@ private FlowController(

parameterContextManager = new StandardParameterContextManager();
repositoryContextFactory = new RepositoryContextFactory(contentRepository, flowFileRepository, flowFileEventRepository, counterRepositoryRef.get(), provenanceRepository, stateManagerProvider);
narManager = createNarManager(nifiProperties);

this.flowAnalysisThreadPool = new FlowEngine(1, "Background Flow Analysis", true);
if (ruleViolationsManager != null) {
Expand Down Expand Up @@ -1376,6 +1384,61 @@ private ProvenanceRepository createProvenanceRepository(final NiFiProperties pro
}
}

private NarManager createNarManager(final NiFiProperties nifiProperties) {
final String implementationClassName = nifiProperties.getProperty(NiFiProperties.NAR_MANAGER_IMPLEMENTATION_CLASS, DEFAULT_NAR_MANAGER_IMPLEMENTATION);
if (StringUtils.isBlank(implementationClassName)) {
throw new RuntimeException("Cannot create NAR Manager because NiFi Properties is missing the following property: " + NiFiProperties.NAR_MANAGER_IMPLEMENTATION_CLASS);
}

LOG.info("Creating NAR Manager [{}]", implementationClassName);
try {
final NarManager narManager = NarThreadContextClassLoader.createInstance(extensionManager, implementationClassName, NarManager.class, nifiProperties);

final Map<String, String> initializationProperties = nifiProperties.getPropertiesWithPrefix(NiFiProperties.NAR_MANAGER_PROPERTIES_PREFIX).entrySet().stream()
.map(entry -> new Tuple<>(entry.getKey().replace(NiFiProperties.NAR_MANAGER_PROPERTIES_PREFIX, ""), entry.getValue()))
.collect(Collectors.toMap(Tuple::getKey, Tuple::getValue));

final NarManagerInitializationContext initializationContext = new StandardNarManagerInitializationContext(initializationProperties);

final ClassLoader narManagerClassLoader = narManager.getClass().getClassLoader();
final NarManager wrappedNarManager = new NarManager() {
@Override
public void initialize(final NarManagerInitializationContext initializationContext) throws IOException {
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(narManagerClassLoader)) {
narManager.initialize(initializationContext);
}
}

@Override
public void shutdown() {
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(narManagerClassLoader)) {
narManager.shutdown();
}
}

@Override
public BundleCoordinate addNar(final String filename, final InputStream inputStream) throws IOException {
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(narManagerClassLoader)) {
return narManager.addNar(filename, inputStream);
}
}

@Override
public Set<BundleCoordinate> getNars() {
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(narManagerClassLoader)) {
return narManager.getNars();
}
}
};


wrappedNarManager.initialize(initializationContext);
return wrappedNarManager;
} catch (final Exception e) {
throw new RuntimeException("Failed to create NAR Manager", e);
}
}

public KerberosConfig createKerberosConfig(final NiFiProperties nifiProperties) {
final String principal = nifiProperties.getKerberosServicePrincipal();
final String keytabLocation = nifiProperties.getKerberosServiceKeytabLocation();
Expand Down Expand Up @@ -2207,6 +2270,10 @@ public ClusterCoordinator getClusterCoordinator() {
return clusterCoordinator;
}

public NarManager getNarManager() {
return narManager;
}

/**
* Creates a connection between two Connectable objects.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.nar;

import org.apache.commons.io.IOUtils;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.util.FileUtils;
import org.apache.nifi.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.util.Map;
import java.util.Set;

public class StandardNarManager implements NarManager {

private static final Logger LOGGER = LoggerFactory.getLogger(StandardNarManager.class);

private static final String DIRECTORY_PROPERTY = "directory";

private volatile File directory;

@Override
public void initialize(final NarManagerInitializationContext initializationContext) throws IOException {
final String directoryPropertyValue = getRequiredValue(initializationContext, DIRECTORY_PROPERTY);
final File directory = new File(directoryPropertyValue);
FileUtils.ensureDirectoryExistAndCanReadAndWrite(directory);
this.directory = directory;
LOGGER.info("NarManager initialization completed - NARs will be stored at [{}]", directory.getAbsolutePath());
}

@Override
public void shutdown() {
// TODO
}

@Override
public BundleCoordinate addNar(final String filename, final InputStream inputStream) throws IOException {
final File destFile = new File(directory, filename);
if (destFile.exists()) {
throw new NarManagerException("NAR file with the same name already exists");
}

final File tempFile = new File(directory, "." + filename);
LOGGER.debug("Writing NAR to temp file at [{}]", tempFile.getAbsolutePath());
try (final OutputStream outputStream = new FileOutputStream(tempFile)) {
IOUtils.copy(inputStream, outputStream);
}

// TODO should we extract and verify the file has a MANIFEST, throw exception if not, otherwise use info to return BundleCoordinate?

LOGGER.debug("Moving NAR to final file at [{}]", destFile.getAbsolutePath());
Files.move(tempFile.toPath(), destFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
return null;
}

@Override
public Set<BundleCoordinate> getNars() {
// TODO
return null;
}

private String getRequiredValue(final NarManagerInitializationContext initializationContext, final String property) {
final Map<String, String> properties = initializationContext.getProperties();
final String value = properties.get(property);
if (StringUtils.isBlank(value)) {
throw new IllegalStateException("Missing required property: " + property);
}
return value;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.nar;

import java.util.Collections;
import java.util.Map;

public class StandardNarManagerInitializationContext implements NarManagerInitializationContext {

private final Map<String, String> properties;

public StandardNarManagerInitializationContext(final Map<String, String> properties) {
this.properties = properties == null ? Collections.emptyMap() : Map.copyOf(properties);
}

@Override
public Map<String, String> getProperties() {
return properties;
}

}
Loading

0 comments on commit afefde8

Please sign in to comment.