Skip to content

Commit

Permalink
NIFI-12973 Add Process Group scope to Flow Analysis rules
Browse files Browse the repository at this point in the history
  • Loading branch information
tpalfy committed May 2, 2024
1 parent b6a8699 commit 4ff41ba
Show file tree
Hide file tree
Showing 14 changed files with 120 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ public class VersionedFlowAnalysisRule extends VersionedConfigurableExtension {
private ScheduledState scheduledState;
private EnforcementPolicy enforcementPolicy;

private String scope;

@Schema(description = "How to handle violations.")
public EnforcementPolicy getEnforcementPolicy() {
return enforcementPolicy;
Expand All @@ -34,6 +36,14 @@ public void setEnforcementPolicy(EnforcementPolicy enforcementPolicy) {
this.enforcementPolicy = enforcementPolicy;
}

public String getScope() {
return scope;
}

public void setScope(String scope) {
this.scope = scope;
}

@Override
public ComponentType getComponentType() {
return ComponentType.FLOW_ANALYSIS_RULE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class FlowAnalysisRuleDTO extends ComponentDTO {
private Boolean supportsSensitiveDynamicProperties;

private String enforcementPolicy;
private String scope;

private Map<String, String> properties;
private Map<String, PropertyDescriptorDTO> descriptors;
Expand Down Expand Up @@ -208,6 +209,14 @@ public void setEnforcementPolicy(String enforcementPolicy) {
this.enforcementPolicy = enforcementPolicy;
}

public String getScope() {
return scope;
}

public void setScope(String scope) {
this.scope = scope;
}

/**
* @return flow analysis rule's properties
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ public abstract class AbstractFlowAnalysisRuleNode extends AbstractComponentNode
private volatile String comment;
private EnforcementPolicy enforcementPolicy;

private String scope;

private volatile FlowAnalysisRuleState state = FlowAnalysisRuleState.DISABLED;

public AbstractFlowAnalysisRuleNode(final LoggableComponent<FlowAnalysisRule> flowAnalysisRule, final String id,
Expand Down Expand Up @@ -114,6 +116,16 @@ public void setEnforcementPolicy(EnforcementPolicy enforcementPolicy) {
this.enforcementPolicy = enforcementPolicy;
}

@Override
public String getScope() {
return scope;
}

@Override
public void setScope(String scope) {
this.scope = scope;
}

@Override
public ConfigurableComponent getComponent() {
return flowAnalysisRuleRef.get().getFlowAnalysisRule();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3624,6 +3624,7 @@ private void updateFlowAnalysisRule(final FlowAnalysisRuleNode flowAnalysisRule,
flowAnalysisRule.setName(proposed.getName());
flowAnalysisRule.setComments(proposed.getComments());
flowAnalysisRule.setEnforcementPolicy(proposed.getEnforcementPolicy());
flowAnalysisRule.setScope(proposed.getScope());

if (!isEqual(flowAnalysisRule.getBundleCoordinate(), proposed.getBundle())) {
final BundleCoordinate newBundleCoordinate = toCoordinate(proposed.getBundle());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,7 @@ public VersionedFlowAnalysisRule mapFlowAnalysisRule(final FlowAnalysisRuleNode
versionedRule.setProperties(mapProperties(flowAnalysisRuleNode, serviceProvider));
versionedRule.setPropertyDescriptors(mapPropertyDescriptors(flowAnalysisRuleNode, serviceProvider, Collections.emptySet(), Collections.emptyMap()));
versionedRule.setEnforcementPolicy(flowAnalysisRuleNode.getEnforcementPolicy());
versionedRule.setScope(flowAnalysisRuleNode.getScope());
versionedRule.setType(flowAnalysisRuleNode.getCanonicalClassName());
versionedRule.setScheduledState(flowMappingOptions.getStateLookup().getState(flowAnalysisRuleNode));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ public interface FlowAnalysisRuleNode extends ComponentNode {
*/
EnforcementPolicy getEnforcementPolicy();

String getScope();

void setScope(String scope);

void setFlowAnalysisRule(LoggableComponent<FlowAnalysisRule> flowAnalysisRule);

FlowAnalysisRuleContext getFlowAnalysisRuleContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ private FlowController(
ruleViolationsManager
);
if (flowAnalyzer != null) {
flowAnalyzer.initialize(controllerServiceProvider);
flowAnalyzer.initialize(flowManager, controllerServiceProvider);
}

final CronSchedulingAgent cronSchedulingAgent = new CronSchedulingAgent(this, timerDrivenEngineRef.get(), repositoryContextFactory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,7 @@ private void updateFlowAnalysisRule(final FlowAnalysisRuleNode ruleNode, final V
ruleNode.setName(flowAnalysisRule.getName());
ruleNode.setComments(flowAnalysisRule.getComments());
ruleNode.setEnforcementPolicy(flowAnalysisRule.getEnforcementPolicy());
ruleNode.setScope(flowAnalysisRule.getScope());

final Set<String> sensitiveDynamicPropertyNames = getSensitiveDynamicPropertyNames(ruleNode, flowAnalysisRule);
final Map<String, String> decryptedProperties = decryptProperties(flowAnalysisRule.getProperties(), controller.getEncryptor());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.nifi.controller.FlowAnalysisRuleNode;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.flow.StandardFlowManager;
import org.apache.nifi.controller.flowanalysis.FlowAnalysisRuleProvider;
import org.apache.nifi.controller.flowanalysis.FlowAnalysisUtil;
import org.apache.nifi.controller.flowanalysis.FlowAnalyzer;
Expand All @@ -29,13 +30,15 @@
import org.apache.nifi.flow.VersionedControllerService;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.flow.VersionedProcessor;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
import org.apache.nifi.validation.RuleViolation;
import org.apache.nifi.validation.RuleViolationsManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -57,6 +60,7 @@ public class StandardFlowAnalyzer implements FlowAnalyzer {
private final FlowAnalysisRuleProvider flowAnalysisRuleProvider;
private final ExtensionManager extensionManager;

private StandardFlowManager flowManager;
private ControllerServiceProvider controllerServiceProvider;

private volatile boolean flowAnalysisRequired;
Expand All @@ -71,7 +75,11 @@ public StandardFlowAnalyzer(
this.extensionManager = extensionManager;
}

public void initialize(final ControllerServiceProvider controllerServiceProvider) {
public void initialize(
final StandardFlowManager flowManager,
final ControllerServiceProvider controllerServiceProvider
) {
this.flowManager = flowManager;
this.controllerServiceProvider = controllerServiceProvider;
}

Expand Down Expand Up @@ -127,6 +135,7 @@ private void analyzeComponent(VersionedComponent component) {

Set<RuleViolation> violations = flowAnalysisRules.stream()
.filter(FlowAnalysisRuleNode::isEnabled)
.filter(ruleNode -> isWithinScope(ruleNode, component.getGroupIdentifier()))
.flatMap(flowAnalysisRuleNode -> {
String ruleId = flowAnalysisRuleNode.getIdentifier();

Expand Down Expand Up @@ -195,6 +204,7 @@ private void analyzeProcessGroup(

flowAnalysisRules.stream()
.filter(FlowAnalysisRuleNode::isEnabled)
.filter(ruleNode -> isWithinScope(ruleNode, groupId))
.forEach(flowAnalysisRuleNode -> {
String ruleId = flowAnalysisRuleNode.getIdentifier();

Expand Down Expand Up @@ -250,6 +260,28 @@ private void analyzeProcessGroup(
processGroup.getProcessGroups().forEach(childProcessGroup -> analyzeProcessGroup(childProcessGroup, flowAnalysisRules, groupViolations, componentToRuleViolations));
}

private boolean isWithinScope(FlowAnalysisRuleNode ruleNode, String groupIdentifier) {
final String ruleScope = ruleNode.getScope();

while (groupIdentifier != null) {
if (ruleScope == null || ruleScope.isBlank()) {
return true;
}

final HashSet<String> scopedProcessGroupIds = new HashSet<>(Arrays.asList(ruleScope.split("\\s*,\\s*")));
if (scopedProcessGroupIds.contains(groupIdentifier)) {
return true;
}

groupIdentifier = Optional.ofNullable(flowManager.getGroup(groupIdentifier))
.map(ProcessGroup::getParent)
.map(ProcessGroup::getIdentifier)
.orElse(null);
}

return false;
}

private String getDisplayName(VersionedComponent component) {
final String displayName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

/**
Expand All @@ -59,6 +60,7 @@ public class FlowAnalysisRuleAuditor extends NiFiAuditor {
private static final String ANNOTATION_DATA = "Annotation Data";
private static final String EXTENSION_VERSION = "Extension Version";
private static final String ENFORCEMENT_POLICY = "Enforcement Policy";
private static final String SCOPE = "Scope";

/**
* Audits the creation of flow analysis rule via createFlowAnalysisRule().
Expand Down Expand Up @@ -106,6 +108,7 @@ public Object updateFlowAnalysisRuleAdvice(ProceedingJoinPoint proceedingJoinPoi
final Map<String, String> values = extractConfiguredPropertyValues(flowAnalysisRule, flowAnalysisRuleDTO);
final FlowAnalysisRuleState state = flowAnalysisRule.getState();
final EnforcementPolicy enforcementPolicy = flowAnalysisRule.getEnforcementPolicy();
final String scope = flowAnalysisRule.getScope();

// update the flow analysis rule state
final FlowAnalysisRuleNode updatedFlowAnalysisRule = (FlowAnalysisRuleNode) proceedingJoinPoint.proceed();
Expand Down Expand Up @@ -202,6 +205,25 @@ public Object updateFlowAnalysisRuleAdvice(ProceedingJoinPoint proceedingJoinPoi
actions.add(configurationAction);
}

final String updatedScope = flowAnalysisRule.getScope();
if (!Objects.equals(updatedScope, scope)) {
final FlowChangeConfigureDetails actionDetails = new FlowChangeConfigureDetails();
actionDetails.setName(SCOPE);
actionDetails.setValue(String.valueOf(updatedScope));
actionDetails.setPreviousValue(String.valueOf(scope));

final FlowChangeAction configurationAction = new FlowChangeAction();
configurationAction.setUserIdentity(user.getIdentity());
configurationAction.setOperation(Operation.Configure);
configurationAction.setTimestamp(actionTimestamp);
configurationAction.setSourceId(flowAnalysisRule.getIdentifier());
configurationAction.setSourceName(flowAnalysisRule.getName());
configurationAction.setSourceType(Component.FlowAnalysisRule);
configurationAction.setComponentDetails(ruleDetails);
configurationAction.setActionDetails(actionDetails);
actions.add(configurationAction);
}

// determine the new state
final FlowAnalysisRuleState updatedState = flowAnalysisRule.getState();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4986,6 +4986,7 @@ public FlowAnalysisRuleDTO createFlowAnalysisRuleDto(FlowAnalysisRuleNode flowAn
final FlowAnalysisRuleDTO dto = new FlowAnalysisRuleDTO();
dto.setId(flowAnalysisRuleNode.getIdentifier());
dto.setEnforcementPolicy(flowAnalysisRuleNode.getEnforcementPolicy().name());
dto.setScope(flowAnalysisRuleNode.getScope());
dto.setName(flowAnalysisRuleNode.getName());
dto.setType(flowAnalysisRuleNode.getCanonicalClassName());
dto.setBundle(createBundleDto(bundleCoordinate));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,13 +304,17 @@ private void configureFlowAnalysisRule(final FlowAnalysisRuleNode flowAnalysisRu
final String name = flowAnalysisRuleDTO.getName();
final String comments = flowAnalysisRuleDTO.getComments();
final String enforcementPolicy = flowAnalysisRuleDTO.getEnforcementPolicy();
final String scope = flowAnalysisRuleDTO.getScope();
final Map<String, String> properties = flowAnalysisRuleDTO.getProperties();

flowAnalysisRule.pauseValidationTrigger(); // avoid triggering validation multiple times
try {
if (isNotNull(enforcementPolicy)) {
flowAnalysisRule.setEnforcementPolicy(EnforcementPolicy.valueOf(enforcementPolicy));
}
if (isNotNull(scope)) {
flowAnalysisRule.setScope(scope);
}
if (isNotNull(name)) {
flowAnalysisRule.setName(name);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,18 @@
<span id="read-only-flow-analysis-rule-enforcement-policy"></span>
</div>
</div>
<div class="setting">
<div class="setting-name">
Scope
<div class="fa fa-question-circle" alt="Info" title="A comma-separated list of ids of the process groups to which this rule should be limited to."></div>
</div>
<div class="flow-analysis-rule-editable setting-field">
<input type="text" id="flow-analysis-rule-scope" name="flow-analysis-rule-scope"/>
</div>
<div class="flow-analysis-rule-read-only setting-field hidden">
<span id="read-only-flow-analysis-rule-scope"></span>
</div>
</div>
</div>
<div class="clear"></div>
</div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@
return true;
}

if ($('#flow-analysis-rule-scope').val() !== entity.component['scope']) {
return true;
}

// defer to the properties
return $('#flow-analysis-rule-properties').propertytable('isSaveRequired');
};
Expand All @@ -127,6 +131,7 @@
flowAnalysisRuleDto['name'] = $('#flow-analysis-rule-name').val();
flowAnalysisRuleDto['comments'] = $('#flow-analysis-rule-comments').val();
flowAnalysisRuleDto['enforcementPolicy'] = enforcementPolicy;
flowAnalysisRuleDto['scope'] = $('#flow-analysis-rule-scope').val();

// set the properties
if ($.isEmptyObject(properties) === false) {
Expand Down Expand Up @@ -475,6 +480,8 @@
}
});

$('#flow-analysis-rule-scope').val(flowAnalysisRule['scope']);

var buttons = [{
buttonText: 'Apply',
color: {
Expand Down Expand Up @@ -650,6 +657,8 @@
}
nfCommon.populateField('read-only-flow-analysis-rule-enforcement-policy', enforcementPolicy);

nfCommon.populateField('read-only-flow-analysis-rule-scope', flowAnalysisRule['scope']);

var buttons = [{
buttonText: 'Ok',
color: {
Expand Down

0 comments on commit 4ff41ba

Please sign in to comment.