Skip to content

Commit

Permalink
Refator - move calls behind service facade, add NarLoaderHoader and i…
Browse files Browse the repository at this point in the history
…nject to NarManager
  • Loading branch information
bbende committed May 21, 2024
1 parent 2a39f69 commit 190b240
Show file tree
Hide file tree
Showing 20 changed files with 168 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

public interface NarManager {

BundleCoordinate saveNar(String filename, InputStream inputStream) throws IOException;
BundleCoordinate addNar(String filename, InputStream inputStream) throws IOException;

boolean deleteNar(BundleCoordinate narCoordinate) throws IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@
<artifactId>nifi-framework-nar-utils</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework-nar-loading-utils</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>

<dependency>
<groupId>org.apache.nifi.registry</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,17 @@ public class StandardNarManager implements NarManager {
private final FlowManager flowManager;
private final ExtensionManager extensionManager;
private final NarPersistenceProvider persistenceProvider;
private final NarLoader narLoader;

public StandardNarManager(final FlowController flowController) {
public StandardNarManager(final FlowController flowController, final NarLoader narLoader) {
this.flowManager = flowController.getFlowManager();
this.extensionManager = flowController.getExtensionManager();
this.persistenceProvider = flowController.getNarPersistenceProvider();
this.narLoader = narLoader;
}

@Override
public BundleCoordinate saveNar(final String filename, final InputStream inputStream) throws IOException {
public BundleCoordinate addNar(final String filename, final InputStream inputStream) throws IOException {
final File tempFile = Files.createTempFile("nifi", "nar").toFile();
LOGGER.debug("Created temporary file {} to hold contents of NAR for {}", tempFile.getAbsolutePath(), filename);
try {
Expand All @@ -67,6 +69,7 @@ public BundleCoordinate saveNar(final String filename, final InputStream inputSt

@Override
public boolean deleteNar(final BundleCoordinate narCoordinate) throws IOException {
// TODO verify the bundle exists, no components are instantiate from it, then remove from extension manager
return persistenceProvider.deleteNar(narCoordinate);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,6 @@ public void initialize(final NarPersistenceProviderInitializationContext initial
}

private void loadNarInformation() {
// TODO what if a NAR is copied into this location manually and doesn't match the filename format being used here?
// we'll end up loading the NAR info, but then a call to delete or stream would fail
final File[] files = storageLocation.listFiles(f -> f.isFile() && f.getName().endsWith(NAR_FILENAME_EXTENSION));
if (files == null) {
LOGGER.info("No existing NARs found at [{}]", storageLocation.getAbsolutePath());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.spring;

import org.apache.nifi.nar.NarLoader;
import org.apache.nifi.nar.NarLoaderHolder;
import org.springframework.beans.factory.FactoryBean;

/**
* Factory bean that provides the NarLoader instance from the holder.
*/
public class NarLoaderFactoryBean implements FactoryBean<NarLoader> {

@Override
public NarLoader getObject() throws Exception {
return NarLoaderHolder.getNarLoader();
}

@Override
public Class<?> getObjectType() {
return NarLoader.class;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,9 @@
</bean>

<!-- extension manager -->
<bean id="extensionManager" class="org.apache.nifi.spring.ExtensionManagerFactoryBean">
</bean>
<bean id="extensionManager" class="org.apache.nifi.spring.ExtensionManagerFactoryBean"/>

<bean id="ruleViolationsManager" class="org.apache.nifi.validation.StandardRuleViolationsManager">
</bean>
<bean id="ruleViolationsManager" class="org.apache.nifi.validation.StandardRuleViolationsManager"/>

<!-- flow controller -->
<bean id="flowController" class="org.apache.nifi.spring.FlowControllerFactoryBean">
Expand Down Expand Up @@ -89,8 +87,11 @@
<property name="statusHistoryRepository" ref="statusHistoryRepository" />
</bean>

<bean id="narLoader" class="org.apache.nifi.spring.NarLoaderFactoryBean"/>

<bean id="narManager" class="org.apache.nifi.nar.StandardNarManager">
<constructor-arg ref="flowController" />
<constructor-arg ref="flowController"/>
<constructor-arg ref="narLoader"/>
</bean>

</beans>
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ nifi.provenance.repository.rollover.time=30 secs
nifi.provenance.repository.rollover.size=100 MB

# NAR Persistence Provider Properties
nifi.nar.persistence.provider.properties.directory=./extensions
nifi.nar.persistence.provider.properties.directory=./nar_storage

# Site to Site properties
nifi.remote.input.socket.port=9990
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.nar;

public class NarLoaderHolder {

private static volatile NarLoader INSTANCE;

public static void init(final NarLoader narLoader) {
if (INSTANCE == null) {
synchronized (NarLoader.class) {
if (INSTANCE == null) {
INSTANCE = narLoader;
} else {
throw new IllegalStateException("Cannot reinitialize NarLoaderHolder");
}
}
} else {
throw new IllegalStateException("Cannot reinitialize NarLoaderHolder");
}
}

public static NarLoader getNarLoader() {
if (INSTANCE == null) {
synchronized (NarLoader.class) {
if (INSTANCE == null) {
throw new IllegalStateException("NarLoaderHolder was never initialized");
}
}
}

return INSTANCE;
}

// Private access
private NarLoaderHolder() {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@

<!-- NAR Manager properties -->
<nifi.nar.persistence.provider.implementation>org.apache.nifi.nar.StandardNarPersistenceProvider</nifi.nar.persistence.provider.implementation>
<nifi.nar.persistence.provider.properties.directory>./extensions</nifi.nar.persistence.provider.properties.directory>
<nifi.nar.persistence.provider.properties.directory>./nar_storage</nifi.nar.persistence.provider.properties.directory>

<!-- nifi.properties: web properties -->
<nifi.web.http.host />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,6 @@
<artifactId>nifi-framework-core-api</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework-nar-loading-utils</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework-external-resource-utils</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import org.apache.nifi.nar.NarAutoLoader;
import org.apache.nifi.nar.NarClassLoadersHolder;
import org.apache.nifi.nar.NarLoader;
import org.apache.nifi.nar.NarLoaderHolder;
import org.apache.nifi.nar.NarThreadContextClassLoader;
import org.apache.nifi.nar.NarUnpackMode;
import org.apache.nifi.nar.StandardExtensionDiscoveringManager;
Expand Down Expand Up @@ -758,7 +759,7 @@ public void start() {
.targetDirectory(new File(props.getProperty(NiFiProperties.NAR_LIBRARY_AUTOLOAD_DIRECTORY, NiFiProperties.DEFAULT_NAR_LIBRARY_AUTOLOAD_DIR)))
.conflictResolutionStrategy(props.getProperty(NAR_PROVIDER_CONFLICT_RESOLUTION, DEFAULT_NAR_PROVIDER_CONFLICT_RESOLUTION))
.pollInterval(props.getProperty(NAR_PROVIDER_POLL_INTERVAL_PROPERTY, DEFAULT_NAR_PROVIDER_POLL_INTERVAL))
.restrainingStartup(Boolean.parseBoolean(props.getProperty(NAR_PROVIDER_RESTRAIN_PROPERTY, "true")))
.restrainingStartup(Boolean.parseBoolean(props.getProperty(NAR_PROVIDER_RESTRAIN_PROPERTY, "true")))
.build();
narProviderService.start();

Expand All @@ -774,6 +775,9 @@ public void start() {
this,
unpackMode);

// Set the NarLoader into the holder which makes it available to the Spring context via a factory bean
NarLoaderHolder.init(narLoader);

narAutoLoader = new NarAutoLoader(props, narLoader);
narAutoLoader.start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
import org.apache.nifi.web.api.entity.ActivateControllerServicesEntity;
import org.apache.nifi.web.api.entity.AffectedComponentEntity;
import org.apache.nifi.web.api.entity.BulletinEntity;
import org.apache.nifi.web.api.entity.BundleEntity;
import org.apache.nifi.web.api.entity.ComponentValidationResultEntity;
import org.apache.nifi.web.api.entity.ConfigurationAnalysisEntity;
import org.apache.nifi.web.api.entity.ConnectionEntity;
Expand Down Expand Up @@ -144,6 +145,8 @@
import org.apache.nifi.web.api.entity.VersionedReportingTaskImportResponseEntity;
import org.apache.nifi.web.api.request.FlowMetricsRegistry;

import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;
import java.util.Date;
import java.util.List;
Expand Down Expand Up @@ -2821,4 +2824,10 @@ ControllerServiceReferencingComponentsEntity updateControllerServiceReferencingC
* @return rule violations produced by the analysis of the process group
*/
FlowAnalysisResultEntity getFlowAnalysisResult(String processGroupId);

// ----------------------------------------
// NAR Management methods
// ----------------------------------------

BundleEntity addNar(String filename, InputStream inputStream) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,12 @@ public Object populateLock(ProceedingJoinPoint proceedingJoinPoint) throws Throw
return proceedWithWriteLock(proceedingJoinPoint);
}

@Around("within(org.apache.nifi.web.NiFiServiceFacade+) && "
+ "execution(* add*(..))")
public Object addLock(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
return proceedWithWriteLock(proceedingJoinPoint);
}

@Around("within(org.apache.nifi.web.NiFiServiceFacade+) && "
+ "execution(* get*(..))")
public Object getLock(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@
import org.apache.nifi.history.PreviousValue;
import org.apache.nifi.metrics.jvm.JmxJvmMetrics;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.NarManager;
import org.apache.nifi.parameter.Parameter;
import org.apache.nifi.parameter.ParameterContext;
import org.apache.nifi.parameter.ParameterContextLookup;
Expand Down Expand Up @@ -286,6 +287,7 @@
import org.apache.nifi.web.api.entity.ActivateControllerServicesEntity;
import org.apache.nifi.web.api.entity.AffectedComponentEntity;
import org.apache.nifi.web.api.entity.BulletinEntity;
import org.apache.nifi.web.api.entity.BundleEntity;
import org.apache.nifi.web.api.entity.ComponentReferenceEntity;
import org.apache.nifi.web.api.entity.ComponentValidationResultEntity;
import org.apache.nifi.web.api.entity.ConfigurationAnalysisEntity;
Expand Down Expand Up @@ -380,6 +382,7 @@
import org.springframework.security.oauth2.core.OAuth2Token;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -462,8 +465,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
private final ClusterMetricsRegistry clusterMetricsRegistry = new ClusterMetricsRegistry();

private RuleViolationsManager ruleViolationsManager;

private PredictionBasedParallelProcessingService parallelProcessingService;
private NarManager narManager;

// -----------------------------------------
// Synchronization methods
Expand Down Expand Up @@ -6562,6 +6565,13 @@ public FlowAnalysisResultEntity createFlowAnalysisResultEntity(Collection<RuleVi
return entity;
}

@Override
public BundleEntity addNar(final String filename, final InputStream inputStream) throws IOException {
final BundleCoordinate narCoordinate = narManager.addNar(filename, inputStream);
final BundleDTO bundleDTO = dtoFactory.createBundleDto(narCoordinate);
return entityFactory.createBundleEntity(bundleDTO);
}

private PermissionsDTO createPermissionDto(
final String id,
final org.apache.nifi.flow.ComponentType subjectComponentType,
Expand Down Expand Up @@ -6813,4 +6823,8 @@ public void setRuleViolationsManager(RuleViolationsManager ruleViolationsManager
public void setParallelProcessingService(PredictionBasedParallelProcessingService parallelProcessingService) {
this.parallelProcessingService = parallelProcessingService;
}

public void setNarManager(final NarManager narManager) {
this.narManager = narManager;
}
}
Loading

0 comments on commit 190b240

Please sign in to comment.