Skip to content

Commit

Permalink
Update flow synchronization to account for changes to referenced assets
Browse files Browse the repository at this point in the history
  • Loading branch information
bbende committed Jul 26, 2024
1 parent f0e8889 commit 262a895
Show file tree
Hide file tree
Showing 8 changed files with 170 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,8 @@ public void addAffectedComponents(final FlowDifference difference) {
return;
}

if (differenceType == DifferenceType.PARAMETER_VALUE_CHANGED || differenceType == DifferenceType.PARAMETER_DESCRIPTION_CHANGED || differenceType == DifferenceType.PARAMETER_REMOVED) {
if (differenceType == DifferenceType.PARAMETER_VALUE_CHANGED || differenceType == DifferenceType.PARAMETER_ASSET_REFERENCES_CHANGED
|| differenceType == DifferenceType.PARAMETER_DESCRIPTION_CHANGED || differenceType == DifferenceType.PARAMETER_REMOVED) {
addComponentsForParameterUpdate(difference);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -123,6 +124,7 @@
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.zip.GZIPInputStream;

import static org.apache.nifi.controller.serialization.FlowSynchronizationUtils.createBundleCoordinate;
Expand Down Expand Up @@ -932,19 +934,26 @@ private void updateParameterContext(
final Map<String, Parameter> parameters = createParameterMap(flowManager, versionedParameterContext, encryptor, assetManager);

final Map<String, String> currentValues = new HashMap<>();
parameterContext.getParameters().values().forEach(param -> currentValues.put(param.getDescriptor().getName(), param.getValue()));
final Map<String, Set<String>> currentAssetReferences = new HashMap<>();
parameterContext.getParameters().values().forEach(param -> {
currentValues.put(param.getDescriptor().getName(), param.getValue());
currentAssetReferences.put(param.getDescriptor().getName(), getAssetIds(param));
});

final Map<String, Parameter> updatedParameters = new HashMap<>();
final Set<String> proposedParameterNames = new HashSet<>();
for (final VersionedParameter parameter : versionedParameterContext.getParameters()) {
final String parameterName = parameter.getName();
final String currentValue = currentValues.get(parameterName);
final Set<String> currentAssetIds = currentAssetReferences.get(parameterName);

proposedParameterNames.add(parameterName);
if (!Objects.equals(currentValue, parameter.getValue())) {
final Parameter updatedParameterObject = parameters.get(parameterName);
final Parameter updatedParameterObject = parameters.get(parameterName);
final String updatedValue = updatedParameterObject.getValue();
final Set<String> updatedAssetIds = getAssetIds(updatedParameterObject);
if (!Objects.equals(currentValue, updatedValue) || !currentAssetIds.equals(updatedAssetIds)) {
updatedParameters.put(parameterName, updatedParameterObject);
}
proposedParameterNames.add(parameterName);
}

// If any parameters are removed, need to add a null value to the map in order to make sure that the parameter is removed.
Expand All @@ -970,6 +979,13 @@ private void updateParameterContext(
parameterContext.setInheritedParameterContexts(referencedContexts);
}

private Set<String> getAssetIds(final Parameter parameter) {
return Stream.ofNullable(parameter.getReferencedAssets())
.flatMap(Collection::stream)
.map(Asset::getIdentifier)
.collect(Collectors.toSet());
}

private void inheritControllerServices(final FlowController controller, final VersionedDataflow dataflow, final AffectedComponentSet affectedComponentSet) {
final FlowManager flowManager = controller.getFlowManager();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,11 @@ public enum DifferenceType {
*/
PARAMETER_VALUE_CHANGED("Parameter Value Changed"),

/**
* The assets referenced by the Parameter is different in each of the flows
*/
PARAMETER_ASSET_REFERENCES_CHANGED("Parameter Asset References Changed"),

/**
* The description of the Parameter is different in each of the flows
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.nifi.components.PortFunction;
import org.apache.nifi.flow.ExecutionEngine;
import org.apache.nifi.flow.VersionedAsset;
import org.apache.nifi.flow.VersionedComponent;
import org.apache.nifi.flow.VersionedConnection;
import org.apache.nifi.flow.VersionedControllerService;
Expand All @@ -43,12 +44,14 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class StandardFlowComparator implements FlowComparator {
private static final Logger logger = LoggerFactory.getLogger(StandardFlowComparator.class);
Expand Down Expand Up @@ -258,6 +261,21 @@ void compare(final VersionedParameterContext contextA, final VersionedParameterC
differences.add(new StandardFlowDifference(DifferenceType.PARAMETER_VALUE_CHANGED, contextA, contextB, name, valueA, valueB, description));
}

final Set<String> assetIdsA = Stream.ofNullable(parameterA.getReferencedAssets())
.flatMap(Collection::stream)
.map(VersionedAsset::getIdentifier)
.collect(Collectors.toSet());
final Set<String> assetIdsB = Stream.ofNullable(parameterB.getReferencedAssets())
.flatMap(Collection::stream)
.map(VersionedAsset::getIdentifier)
.collect(Collectors.toSet());
if (!assetIdsA.equals(assetIdsB)) {
final List<VersionedAsset> valueA = parameterA.getReferencedAssets();
final List<VersionedAsset> valueB = parameterB.getReferencedAssets();
final String description = differenceDescriptor.describeDifference(DifferenceType.PARAMETER_ASSET_REFERENCES_CHANGED, flowA.getName(), flowB.getName(), contextA, contextB, name, valueA, valueB);
differences.add(new StandardFlowDifference(DifferenceType.PARAMETER_VALUE_CHANGED, contextA, contextB, name, valueA, valueB, description));
}

if (!Objects.equals(parameterA.getDescription(), parameterB.getDescription())) {
final String valueA = parameterA.getDescription();
final String valueB = parameterB.getDescription();
Expand Down
1 change: 1 addition & 0 deletions nifi-system-tests/nifi-system-test-suite/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@
<exclude>src/test/resources/keystore.jks</exclude>
<exclude>src/test/resources/truststore.jks</exclude>
<exclude>src/test/resources/sample-assets/helloworld.txt</exclude>
<exclude>src/test/resources/sample-assets/helloworld2.txt</exclude>
</excludes>
</configuration>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1364,6 +1364,7 @@ public void deleteAll(final String groupId) throws NiFiClientException, IOExcept
for (final ProcessorEntity processorEntity : flowDto.getProcessors()) {
processorEntity.setDisconnectedNodeAcknowledged(true);
getProcessorClient().deleteProcessor(processorEntity);
logger.info("Deleted processor [{}]", processorEntity.getId());
}

// Delete all Controller Services
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ public void testSynchronizeAssets() throws NiFiClientException, IOException, Int
getClientUtil().updateProcessorProperties(ingest, Map.of("Filename", "#{fileToIngest}", "Delete File", "false"));
getClientUtil().updateProcessorSchedulingPeriod(ingest, "10 mins");

final AssetEntity asset = getNifiClient().getParamContextClient().createAsset(paramContext.getId(), "helloworld.txt", new File("src/test/resources/sample-assets/helloworld.txt"));
assertNotNull(asset);
// Create an asset and update the parameter to reference the asset
final File assetFile = new File("src/test/resources/sample-assets/helloworld.txt");
final AssetEntity asset = createAsset(paramContext.getId(), assetFile);

final ParameterContextUpdateRequestEntity referenceAssetUpdateRequest = getClientUtil().updateParameterAssetReferences(
paramContext, Map.of("fileToIngest", List.of(asset.getAsset().getId())));
Expand Down Expand Up @@ -112,4 +113,99 @@ public void testSynchronizeAssets() throws NiFiClientException, IOException, Int

waitForValidProcessor(ingest.getId());
}

@Test
public void testSynchronizeAssetsAfterChangingReferences() throws NiFiClientException, IOException, InterruptedException {
waitForAllNodesConnected();

final Map<String, String> paramValues = Map.of("name", "foo", "filesToIngest", "");
final ParameterContextEntity paramContext = getClientUtil().createParameterContext("testSynchronizeAssetsAfterChangingReferences", paramValues);

// Set the Parameter Context on the root Process Group
setParameterContext("root", paramContext);

// Create a Processor and update it to reference Parameter "name"
final ProcessorEntity generateFlowFile = getClientUtil().createProcessor("GenerateFlowFile");
getClientUtil().updateProcessorProperties(generateFlowFile, Map.of("Text", "#{filesToIngest}"));
getClientUtil().updateProcessorSchedulingPeriod(generateFlowFile, "10 mins");

// Create two assets and update the parameter to reference the assets
final AssetEntity asset1 = createAsset(paramContext.getId(), new File("src/test/resources/sample-assets/helloworld.txt"));
final AssetEntity asset2 = createAsset(paramContext.getId(), new File("src/test/resources/sample-assets/helloworld2.txt"));
final List<String> assetIds = List.of(asset1.getAsset().getId(), asset2.getAsset().getId());

final ParameterContextUpdateRequestEntity referenceAssetUpdateRequest = getClientUtil().updateParameterAssetReferences(paramContext, Map.of("filesToIngest", assetIds));
getClientUtil().waitForParameterContextRequestToComplete(paramContext.getId(), referenceAssetUpdateRequest.getRequest().getRequestId());

// Connect the Generate processor to a Terminate processor and generate flow files
final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile");
final ConnectionEntity connection = getClientUtil().createConnection(generateFlowFile, terminate, "success");
waitForValidProcessor(generateFlowFile.getId());

getClientUtil().startProcessor(generateFlowFile);
waitForQueueCount(connection.getId(), getNumberOfNodes());

// Verify flow files reference both assets
final String flowFileContents = getClientUtil().getFlowFileContentAsUtf8(connection.getId(), 0);
assertTrue(flowFileContents.contains(asset1.getAsset().getName()));
assertTrue(flowFileContents.contains(asset2.getAsset().getName()));

// Verify the contents of the assets directory for each node
final File node1Dir = getNiFiInstance().getNodeInstance(1).getInstanceDirectory();
final File node1AssetsDir = new File(node1Dir, "assets");
final File node1ContextDir = new File(node1AssetsDir, paramContext.getId());
assertTrue(node1ContextDir.exists());

final File node2Dir = getNiFiInstance().getNodeInstance(2).getInstanceDirectory();
final File node2AssetsDir = new File(node2Dir, "assets");
final File node2ContextDir = new File(node2AssetsDir, paramContext.getId());
assertTrue(node2ContextDir.exists());

// List assets and verify the expected asset is returned
final AssetsEntity assetListing = assertAssetListing(paramContext.getId(), 2);
assertAssetExists(asset1, assetListing);

// Stop node2 and delete assets
getNiFiInstance().getNodeInstance(2).stop();
FileUtils.deleteFilesInDir(node2AssetsDir, (dir, name) -> true, null, true, true);
assertTrue(node2AssetsDir.delete());
assertFalse(node2AssetsDir.exists());

// Modify the parameter context on node1 to only reference asset1
final ParameterContextEntity retrievedContext = getNifiClient().getParamContextClient().getParamContext(paramContext.getId(), false);
final ParameterContextUpdateRequestEntity changeAssetsUpdateRequest = getClientUtil().updateParameterAssetReferences(
retrievedContext, Map.of("filesToIngest", List.of(asset1.getAsset().getId())));
getClientUtil().waitForParameterContextRequestToComplete(paramContext.getId(), changeAssetsUpdateRequest.getRequest().getRequestId());

// Start node2 again
getNiFiInstance().getNodeInstance(2).start(true);
waitForAllNodesConnected();

// Verify node2 asset directories are back
assertTrue(node2AssetsDir.exists());
assertTrue(node2ContextDir.exists());

// Stop generate processor and clear queue
getClientUtil().stopProcessor(generateFlowFile);
getClientUtil().waitForStoppedProcessor(generateFlowFile.getId());

getClientUtil().startProcessor(terminate);
waitForQueueCount(connection.getId(), 0);

getClientUtil().stopProcessor(terminate);
getClientUtil().waitForStoppedProcessor(terminate.getId());

// Start generate again
getClientUtil().startProcessor(generateFlowFile);
waitForQueueCount(connection.getId(), getNumberOfNodes());

// Verify flow files only reference the first asset
final String flowFile1Contents = getClientUtil().getFlowFileContentAsUtf8(connection.getId(), 0);
assertTrue(flowFile1Contents.contains(asset1.getAsset().getName()));
assertFalse(flowFile1Contents.contains(asset2.getAsset().getName()));

final String flowFile2Contents = getClientUtil().getFlowFileContentAsUtf8(connection.getId(), 1);
assertTrue(flowFile2Contents.contains(asset1.getAsset().getName()));
assertFalse(flowFile2Contents.contains(asset2.getAsset().getName()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1004,6 +1004,30 @@ private void waitForStoppedProcessor(final String processorId) throws Interrupte
getClientUtil().waitForStoppedProcessor(processorId);
}

/**
* This method performs multiple attempts of creating the asset due to an intermittent issue with OkHttp client:
* https://github.com/square/okhttp/issues/5390
*
* This issue happens intermittently during clustered system tests that create an asset which then replicates the asset to the cluster nodes.
*/
protected AssetEntity createAsset(final String paramContextId, final File assetFile) throws NiFiClientException {
int count = 0;
int numRetries = 3;
do {
try {
final String assetName = assetFile.getName();
final AssetEntity asset = getNifiClient().getParamContextClient().createAsset(paramContextId, assetName, assetFile);
logger.info("Created asset [{}] in parameter context [{}] on attempt {}", assetName, paramContextId, count);
assertAsset(asset, assetName);
return asset;
} catch (final NiFiClientException | IOException e) {
logger.warn("Failed to create asset [{}] on attempt {}", assetFile.getName(), count, e);
}
} while (count++ < numRetries);

throw new NiFiClientException("Failed to create asset [%s] after %s attempts".formatted(assetFile.getName(), numRetries));
}

protected AssetsEntity assertAssetListing(final String paramContextId, final int expectedCount) throws NiFiClientException, IOException {
final AssetsEntity assetListing = getNifiClient().getParamContextClient().getAssets(paramContextId);
assertNotNull(assetListing);
Expand All @@ -1028,4 +1052,5 @@ protected void assertAsset(final AssetEntity asset, final String expectedName) {
assertNotNull(asset.getAsset().getDigest());
assertEquals(expectedName, asset.getAsset().getName());
}

}

0 comments on commit 262a895

Please sign in to comment.