Skip to content

Commit

Permalink
Initial setup of API and creation in FlowController
Browse files Browse the repository at this point in the history
  • Loading branch information
bbende committed May 14, 2024
1 parent 13e281d commit f4bff52
Show file tree
Hide file tree
Showing 7 changed files with 253 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,11 @@ public class NiFiProperties extends ApplicationProperties {
// kubernetes properties
public static final String CLUSTER_LEADER_ELECTION_KUBERNETES_LEASE_PREFIX = "nifi.cluster.leader.election.kubernetes.lease.prefix";

// nar manager properties
public static final String NAR_MANAGER_PREFIX = "nifi.nar.manager.";
public static final String NAR_MANAGER_IMPLEMENTATION_CLASS = NAR_MANAGER_PREFIX + "implementation";
public static final String DEFAULT_NAR_MANAGER_IMPLEMENTATION_CLASS = "org.apache.nifi.nar.StandardNarManager";

public static final String DEFAULT_PYTHON_WORKING_DIRECTORY = "./work/python";

// automatic diagnostic defaults
Expand Down Expand Up @@ -1818,6 +1823,22 @@ public List<File> getPythonExtensionsDirectories() {
.collect(Collectors.toList());
}

/**
* @return the name of the class to use for the NarManager implementation
*/
public String getNarManagerImplementationClass() {
return getProperty(NAR_MANAGER_IMPLEMENTATION_CLASS, DEFAULT_NAR_MANAGER_IMPLEMENTATION_CLASS);
}

/**
* @return the properties that start with "nifi.nar.manager", excluding the implementation class property
*/
public Map<String, String> getNarManagerProperties() {
final Map<String, String> properties = getPropertiesWithPrefix(NAR_MANAGER_PREFIX);
properties.remove(NAR_MANAGER_IMPLEMENTATION_CLASS);
return properties;
}

/**
* Returns all properties where the property key starts with the prefix.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.nar;

import org.apache.nifi.bundle.BundleCoordinate;

import java.io.IOException;
import java.io.InputStream;
import java.util.Set;

public interface NarManager {

void initialize(NarManagerInitializationContext initializationContext) throws IOException;

void shutdown();

BundleCoordinate addNar(String filename, InputStream inputStream);

Set<BundleCoordinate> getNars();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.nar;

public class NarManagerException extends RuntimeException {

public NarManagerException(final String message) {
super(message);
}

public NarManagerException(final String message, final Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.nar;

import org.apache.nifi.components.PropertyValue;

import java.util.Map;

public interface NarManagerInitializationContext {

/**
* @return the properties that have been configured for the NarManager
*/
Map<String, String> getProperties();

}
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,10 @@
import org.apache.nifi.nar.ExtensionDiscoveringManager;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.nar.NarManager;
import org.apache.nifi.nar.NarThreadContextClassLoader;
import org.apache.nifi.nar.PythonBundle;
import org.apache.nifi.nar.StandardNarManagerInitializationContext;
import org.apache.nifi.parameter.ParameterContextManager;
import org.apache.nifi.parameter.ParameterLookup;
import org.apache.nifi.parameter.ParameterProvider;
Expand Down Expand Up @@ -208,6 +210,7 @@
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.ReflectionUtils;
import org.apache.nifi.util.Tuple;
import org.apache.nifi.util.concurrency.TimedLock;
import org.apache.nifi.validation.RuleViolationsManager;
import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
Expand Down Expand Up @@ -297,6 +300,7 @@ public class FlowController implements ReportingTaskProvider, FlowAnalysisRulePr
private final StateManagerProvider stateManagerProvider;
private final long systemStartTime = System.currentTimeMillis(); // time at which the node was started
private final RevisionManager revisionManager;
private final NarManager narManager;

private final ConnectionLoadBalanceServer loadBalanceServer;
private final NioAsyncLoadBalanceClientRegistry loadBalanceClientRegistry;
Expand Down Expand Up @@ -541,6 +545,12 @@ private FlowController(
throw new RuntimeException(e);
}

try {
this.narManager = createNarManager(nifiProperties);
} catch (final Exception e) {
throw new RuntimeException("Unable to create NAR Manager", e);
}

lifecycleStateManager = new StandardLifecycleStateManager();
processScheduler = new StandardProcessScheduler(timerDrivenEngineRef.get(), this, stateManagerProvider, this.nifiProperties, lifecycleStateManager);

Expand Down Expand Up @@ -1376,6 +1386,27 @@ private ProvenanceRepository createProvenanceRepository(final NiFiProperties pro
}
}

private NarManager createNarManager(final NiFiProperties nifiProperties) {
final String implementationClassName = nifiProperties.getNarManagerImplementationClass();
if (StringUtils.isBlank(implementationClassName)) {
throw new RuntimeException("Cannot create NAR Manager because NiFi Properties is missing the following property: " + NiFiProperties.NAR_MANAGER_IMPLEMENTATION_CLASS);
}

final Map<String, String> narManagerProperties = nifiProperties.getNarManagerProperties();
final Map<String, String> initializationProperties = narManagerProperties.entrySet().stream()
.map(entry -> new Tuple<>(entry.getKey().replace(NiFiProperties.NAR_MANAGER_PREFIX, ""), entry.getValue()))
.collect(Collectors.toMap(Tuple::getKey, Tuple::getValue));

LOG.info("Creating NAR Manager [{}]", implementationClassName);
try {
final NarManager narManager = NarThreadContextClassLoader.createInstance(extensionManager, implementationClassName, NarManager.class, nifiProperties);
narManager.initialize(new StandardNarManagerInitializationContext(initializationProperties));
return narManager;
} catch (final Exception e) {
throw new RuntimeException(e);
}
}

public KerberosConfig createKerberosConfig(final NiFiProperties nifiProperties) {
final String principal = nifiProperties.getKerberosServicePrincipal();
final String keytabLocation = nifiProperties.getKerberosServiceKeytabLocation();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.nar;

import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import java.util.Set;

public class StandardNarManager implements NarManager {

private static final Logger LOGGER = LoggerFactory.getLogger(StandardNarManager.class);

private static final String DIRECTORY_PROPERTY = "directory";

private volatile String directory;

@Override
public void initialize(final NarManagerInitializationContext initializationContext) throws IOException {
directory = getRequiredValue(initializationContext, DIRECTORY_PROPERTY);
// TODO ensure location exists
LOGGER.info("Initialization completed - NARs will be stored at [{}]", directory);
}

@Override
public void shutdown() {
// TODO
}

@Override
public BundleCoordinate addNar(final String filename, final InputStream inputStream) {
// TODO
return null;
}

@Override
public Set<BundleCoordinate> getNars() {
// TODO
return null;
}

private String getRequiredValue(final NarManagerInitializationContext initializationContext, final String property) {
final Map<String, String> properties = initializationContext.getProperties();
final String value = properties.get(property);
if (StringUtils.isBlank(value)) {
throw new IllegalStateException("Missing required property: " + property);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.nar;

import java.util.Collections;
import java.util.Map;

public class StandardNarManagerInitializationContext implements NarManagerInitializationContext {

private final Map<String, String> properties;

public StandardNarManagerInitializationContext(final Map<String, String> properties) {
this.properties = properties == null ? Collections.emptyMap() : Map.copyOf(properties);
}

@Override
public Map<String, String> getProperties() {
return properties;
}

}

0 comments on commit f4bff52

Please sign in to comment.