diff --git a/nifi-api/src/main/java/org/apache/nifi/flow/VersionedExternalFlowMetadata.java b/nifi-api/src/main/java/org/apache/nifi/flow/VersionedExternalFlowMetadata.java index 4486b5ec40488..09f32d31f4957 100644 --- a/nifi-api/src/main/java/org/apache/nifi/flow/VersionedExternalFlowMetadata.java +++ b/nifi-api/src/main/java/org/apache/nifi/flow/VersionedExternalFlowMetadata.java @@ -20,7 +20,7 @@ public class VersionedExternalFlowMetadata { private String bucketId; private String flowId; - private int version; + private String version; private String flowName; private String author; private String comments; @@ -42,11 +42,11 @@ public void setFlowIdentifier(final String flowId) { this.flowId = flowId; } - public int getVersion() { + public String getVersion() { return version; } - public void setVersion(final int version) { + public void setVersion(final String version) { this.version = version; } diff --git a/nifi-api/src/main/java/org/apache/nifi/flow/VersionedFlowCoordinates.java b/nifi-api/src/main/java/org/apache/nifi/flow/VersionedFlowCoordinates.java index 125982297c0ec..d17f1156f0f71 100644 --- a/nifi-api/src/main/java/org/apache/nifi/flow/VersionedFlowCoordinates.java +++ b/nifi-api/src/main/java/org/apache/nifi/flow/VersionedFlowCoordinates.java @@ -26,7 +26,7 @@ public class VersionedFlowCoordinates { private String storageLocation; private String bucketId; private String flowId; - private int version; + private String version; private Boolean latest; @Schema(description = "The identifier of the Flow Registry that contains the flow") @@ -66,11 +66,11 @@ public void setFlowId(String flowId) { } @Schema(description = "The version of the flow") - public int getVersion() { + public String getVersion() { return version; } - public void setVersion(int version) { + public void setVersion(String version) { this.version = version; } diff --git a/nifi-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClient.java b/nifi-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClient.java index b701d31b4fb90..aa1c22ffc420f 100644 --- a/nifi-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClient.java +++ b/nifi-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClient.java @@ -19,6 +19,7 @@ import org.apache.nifi.components.ConfigurableComponent; import java.io.IOException; +import java.util.Optional; import java.util.Set; /** @@ -165,20 +166,21 @@ public interface FlowRegistryClient extends ConfigurableComponent { * @throws FlowRegistryException If an issue happens during processing the request. * @throws IOException If there is issue with the communication between NiFi and the Flow Registry. */ - RegisteredFlowSnapshot getFlowContents(FlowRegistryClientConfigurationContext context, String bucketId, String flowId, int version) throws FlowRegistryException, IOException; + RegisteredFlowSnapshot getFlowContents(FlowRegistryClientConfigurationContext context, String bucketId, String flowId, String version) throws FlowRegistryException, IOException; /** * Adds the given snapshot to the Flow Registry for the given Flow. * * @param context Configuration context. * @param flowSnapshot The flow snapshot to register. + * @param action The register action * * @return The flow snapshot. * * @throws FlowRegistryException If an issue happens during processing the request. * @throws IOException If there is issue with the communication between NiFi and the Flow Registry. */ - RegisteredFlowSnapshot registerFlowSnapshot(FlowRegistryClientConfigurationContext context, RegisteredFlowSnapshot flowSnapshot) throws FlowRegistryException, IOException; + RegisteredFlowSnapshot registerFlowSnapshot(FlowRegistryClientConfigurationContext context, RegisteredFlowSnapshot flowSnapshot, RegisterAction action) throws FlowRegistryException, IOException; /** * Retrieves the set of all versions of the specified flow. @@ -208,5 +210,5 @@ public interface FlowRegistryClient extends ConfigurableComponent { * @throws FlowRegistryException If an issue happens during processing the request. * @throws IOException If there is issue with the communication between NiFi and the Flow Registry. */ - int getLatestVersion(FlowRegistryClientConfigurationContext context, String bucketId, String flowId) throws FlowRegistryException, IOException; + Optional getLatestVersion(FlowRegistryClientConfigurationContext context, String bucketId, String flowId) throws FlowRegistryException, IOException; } diff --git a/nifi-api/src/main/java/org/apache/nifi/registry/flow/RegisterAction.java b/nifi-api/src/main/java/org/apache/nifi/registry/flow/RegisterAction.java new file mode 100644 index 0000000000000..593a302d690bf --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/registry/flow/RegisterAction.java @@ -0,0 +1,25 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.nifi.registry.flow; + +public enum RegisterAction { + COMMIT, + FORCE_COMMIT; +} diff --git a/nifi-api/src/main/java/org/apache/nifi/registry/flow/RegisteredFlowSnapshot.java b/nifi-api/src/main/java/org/apache/nifi/registry/flow/RegisteredFlowSnapshot.java index 4d0722ebc350d..79bf1a8027019 100644 --- a/nifi-api/src/main/java/org/apache/nifi/registry/flow/RegisteredFlowSnapshot.java +++ b/nifi-api/src/main/java/org/apache/nifi/registry/flow/RegisteredFlowSnapshot.java @@ -32,6 +32,7 @@ public class RegisteredFlowSnapshot { private Map parameterContexts; private String flowEncodingVersion; private Map parameterProviders; + private boolean latest; public RegisteredFlowSnapshotMetadata getSnapshotMetadata() { return snapshotMetadata; @@ -62,7 +63,7 @@ public String getFlowEncodingVersion() { } public boolean isLatest() { - return flow != null && snapshotMetadata != null && flow.getVersionCount() == getSnapshotMetadata().getVersion(); + return latest; } public void setSnapshotMetadata(final RegisteredFlowSnapshotMetadata snapshotMetadata) { @@ -100,4 +101,8 @@ public Map getParameterProviders() { public void setParameterProviders(final Map parameterProviders) { this.parameterProviders = parameterProviders; } + + public void setLatest(final boolean latest) { + this.latest = latest; + } } diff --git a/nifi-api/src/main/java/org/apache/nifi/registry/flow/RegisteredFlowSnapshotMetadata.java b/nifi-api/src/main/java/org/apache/nifi/registry/flow/RegisteredFlowSnapshotMetadata.java index 2db5fa37e6998..ea0f0ad75a743 100644 --- a/nifi-api/src/main/java/org/apache/nifi/registry/flow/RegisteredFlowSnapshotMetadata.java +++ b/nifi-api/src/main/java/org/apache/nifi/registry/flow/RegisteredFlowSnapshotMetadata.java @@ -19,7 +19,7 @@ public class RegisteredFlowSnapshotMetadata { private String bucketIdentifier; private String flowIdentifier; - private int version; + private String version; private long timestamp; private String author; private String comments; @@ -32,7 +32,7 @@ public String getFlowIdentifier() { return flowIdentifier; } - public int getVersion() { + public String getVersion() { return version; } @@ -56,7 +56,7 @@ public void setFlowIdentifier(String flowIdentifier) { this.flowIdentifier = flowIdentifier; } - public void setVersion(int version) { + public void setVersion(String version) { this.version = version; } diff --git a/nifi-nar-bundles/nifi-flow-registry-client-bundle/nifi-flow-registry-client-services/src/main/java/org/apache/nifi/registry/flow/NifiRegistryFlowRegistryClient.java b/nifi-nar-bundles/nifi-flow-registry-client-bundle/nifi-flow-registry-client-services/src/main/java/org/apache/nifi/registry/flow/NifiRegistryFlowRegistryClient.java index f3dc5a731dc37..98d7eb16ad59b 100644 --- a/nifi-nar-bundles/nifi-flow-registry-client-bundle/nifi-flow-registry-client-services/src/main/java/org/apache/nifi/registry/flow/NifiRegistryFlowRegistryClient.java +++ b/nifi-nar-bundles/nifi-flow-registry-client-bundle/nifi-flow-registry-client-services/src/main/java/org/apache/nifi/registry/flow/NifiRegistryFlowRegistryClient.java @@ -40,6 +40,7 @@ import java.util.Arrays; import java.util.List; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -229,45 +230,62 @@ public Set getFlows(final FlowRegistryClientConfigurationContext @Override public RegisteredFlowSnapshot getFlowContents( - final FlowRegistryClientConfigurationContext context, final String bucketId, final String flowId, final int version + final FlowRegistryClientConfigurationContext context, final String bucketId, final String flowId, final String version ) throws FlowRegistryException, IOException { try { final FlowSnapshotClient snapshotClient = getFlowSnapshotClient(context); - final VersionedFlowSnapshot snapshot = snapshotClient.get(bucketId, flowId, version); + final VersionedFlowSnapshot snapshot = snapshotClient.get(bucketId, flowId, Integer.parseInt(version)); if (snapshot == null) { - throw new NoSuchFlowVersionException(String.format("Version %d of flow %s does not exist in bucket %s", version, flowId, bucketId)); + throw new NoSuchFlowVersionException(String.format("Version %s of flow %s does not exist in bucket %s", version, flowId, bucketId)); } - return NifiRegistryUtil.convert(snapshot); + final RegisteredFlowSnapshot registeredFlowSnapshot = NifiRegistryUtil.convert(snapshot); + registeredFlowSnapshot.setLatest(snapshot.getSnapshotMetadata().getVersion() == snapshot.getFlow().getVersionCount()); + return registeredFlowSnapshot; } catch (final NiFiRegistryException e) { throw new FlowRegistryException(e.getMessage(), e); } } @Override - public RegisteredFlowSnapshot registerFlowSnapshot(FlowRegistryClientConfigurationContext context, RegisteredFlowSnapshot flowSnapshot) throws FlowRegistryException, IOException { + public RegisteredFlowSnapshot registerFlowSnapshot(FlowRegistryClientConfigurationContext context, RegisteredFlowSnapshot flowSnapshot, RegisterAction action) + throws FlowRegistryException, IOException { try { + final RegisteredFlowSnapshotMetadata snapshotMetadata = flowSnapshot.getSnapshotMetadata(); + final int snapshotVersion = getRegisteredFlowSnapshotVersion(snapshotMetadata, action); + snapshotMetadata.setVersion(String.valueOf(snapshotVersion)); + final FlowSnapshotClient snapshotClient = getFlowSnapshotClient(context); final VersionedFlowSnapshot versionedFlowSnapshot = snapshotClient.create(NifiRegistryUtil.convert(flowSnapshot)); - final VersionedFlowCoordinates versionedFlowCoordinates = new VersionedFlowCoordinates(); final String bucketId = versionedFlowSnapshot.getFlow().getBucketIdentifier(); final String flowId = versionedFlowSnapshot.getFlow().getIdentifier(); final int version = (int) versionedFlowSnapshot.getFlow().getVersionCount(); + final VersionedFlowCoordinates versionedFlowCoordinates = new VersionedFlowCoordinates(); versionedFlowCoordinates.setRegistryId(getIdentifier()); versionedFlowCoordinates.setBucketId(bucketId); versionedFlowCoordinates.setFlowId(flowId); - versionedFlowCoordinates.setVersion(version); + versionedFlowCoordinates.setVersion(String.valueOf(version)); versionedFlowCoordinates.setStorageLocation(getProposedUri(context) + "/nifi-registry-api/buckets/" + bucketId + "/flows/" + flowId + "/versions/" + version); versionedFlowSnapshot.getFlowContents().setVersionedFlowCoordinates(versionedFlowCoordinates); - return NifiRegistryUtil.convert(versionedFlowSnapshot); + + final RegisteredFlowSnapshot registeredFlowSnapshot = NifiRegistryUtil.convert(versionedFlowSnapshot); + registeredFlowSnapshot.setLatest(true); + return registeredFlowSnapshot; } catch (NiFiRegistryException e) { throw new FlowRegistryException(e.getMessage(), e); } } + private static int getRegisteredFlowSnapshotVersion(final RegisteredFlowSnapshotMetadata snapshotMetadata, final RegisterAction action) { + if (RegisterAction.FORCE_COMMIT == action) { + return -1; + } + return snapshotMetadata.getVersion() == null ? 1 : Integer.parseInt(snapshotMetadata.getVersion()) + 1; + } + @Override public Set getFlowVersions( final FlowRegistryClientConfigurationContext context, final String bucketId, final String flowId @@ -281,9 +299,10 @@ public Set getFlowVersions( } @Override - public int getLatestVersion(final FlowRegistryClientConfigurationContext context, final String bucketId, final String flowId) throws FlowRegistryException, IOException { + public Optional getLatestVersion(final FlowRegistryClientConfigurationContext context, final String bucketId, final String flowId) throws FlowRegistryException, IOException { try { - return (int) getFlowClient(context).get(bucketId, flowId).getVersionCount(); + final int versionCount = (int) getFlowClient(context).get(bucketId, flowId).getVersionCount(); + return versionCount == 0 ? Optional.empty() : Optional.of(String.valueOf(versionCount)); } catch (NiFiRegistryException e) { throw new FlowRegistryException(e.getMessage(), e); } diff --git a/nifi-nar-bundles/nifi-flow-registry-client-bundle/nifi-flow-registry-client-services/src/main/java/org/apache/nifi/registry/flow/NifiRegistryUtil.java b/nifi-nar-bundles/nifi-flow-registry-client-bundle/nifi-flow-registry-client-services/src/main/java/org/apache/nifi/registry/flow/NifiRegistryUtil.java index c49d56f18f53b..6276c573ce5f7 100644 --- a/nifi-nar-bundles/nifi-flow-registry-client-bundle/nifi-flow-registry-client-services/src/main/java/org/apache/nifi/registry/flow/NifiRegistryUtil.java +++ b/nifi-nar-bundles/nifi-flow-registry-client-bundle/nifi-flow-registry-client-services/src/main/java/org/apache/nifi/registry/flow/NifiRegistryUtil.java @@ -77,7 +77,7 @@ static RegisteredFlowSnapshotMetadata convert(final VersionedFlowSnapshotMetadat final RegisteredFlowSnapshotMetadata result = new RegisteredFlowSnapshotMetadata(); result.setBucketIdentifier(metadata.getBucketIdentifier()); result.setFlowIdentifier(metadata.getFlowIdentifier()); - result.setVersion(metadata.getVersion()); + result.setVersion(String.valueOf(metadata.getVersion())); result.setTimestamp(metadata.getTimestamp()); result.setAuthor(metadata.getAuthor()); result.setComments(metadata.getComments()); @@ -88,7 +88,7 @@ static VersionedFlowSnapshotMetadata convert(final RegisteredFlowSnapshotMetadat final VersionedFlowSnapshotMetadata result = new VersionedFlowSnapshotMetadata(); result.setBucketIdentifier(metadata.getBucketIdentifier()); result.setFlowIdentifier(metadata.getFlowIdentifier()); - result.setVersion(metadata.getVersion()); + result.setVersion(Integer.parseInt(metadata.getVersion())); result.setTimestamp(metadata.getTimestamp()); result.setAuthor(metadata.getAuthor()); result.setComments(metadata.getComments()); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionControlInformationDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionControlInformationDTO.java index 6648983374a19..e29f76ce30341 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionControlInformationDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionControlInformationDTO.java @@ -38,7 +38,7 @@ public class VersionControlInformationDTO { private String flowId; private String flowName; private String flowDescription; - private Integer version; + private String version; private String storageLocation; private String state; private String stateExplanation; @@ -116,11 +116,11 @@ public void setFlowDescription(String flowDescription) { } @Schema(description = "The version of the flow") - public Integer getVersion() { + public String getVersion() { return version; } - public void setVersion(final Integer version) { + public void setVersion(final String version) { this.version = version; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java index 7b128fc6bd461..c2f78c870f14a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java @@ -346,7 +346,7 @@ private void synchronize(final ProcessGroup group, final VersionedProcessGroup p final String registryId = determineRegistryId(remoteCoordinates); final String bucketId = remoteCoordinates.getBucketId(); final String flowId = remoteCoordinates.getFlowId(); - final int version = remoteCoordinates.getVersion(); + final String version = remoteCoordinates.getVersion(); final String storageLocation = remoteCoordinates.getStorageLocation(); final FlowRegistryClientNode flowRegistry = context.getFlowManager().getFlowRegistryClient(registryId); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index c7775c99a8efe..eaf246732425f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -3505,7 +3505,7 @@ public String getRegistryName() { } private boolean isModified() { - if (versionControlInformation.getVersion() == 0) { + if (versionControlInformation.getVersion() == null) { return true; } @@ -3741,7 +3741,7 @@ public void synchronizeWithFlowRegistry(final FlowManager flowManager) { } final VersionedProcessGroup snapshot = vci.getFlowSnapshot(); - if (snapshot == null && vci.getVersion() > 0) { + if (snapshot == null && vci.getVersion() != null) { // We have not yet obtained the snapshot from the Flow Registry, so we need to request the snapshot of our local version of the flow from the Flow Registry. // This allows us to know whether or not the flow has been modified since it was last synced with the Flow Registry. try { @@ -3774,15 +3774,15 @@ public void synchronizeWithFlowRegistry(final FlowManager flowManager) { try { final RegisteredFlow versionedFlow = flowRegistry.getFlow(FlowRegistryClientContextFactory.getAnonymousContext(), vci.getBucketIdentifier(), vci.getFlowIdentifier()); - final int latestVersion = (int) versionedFlow.getVersionCount(); + final String latestVersion = flowRegistry.getLatestVersion(FlowRegistryClientContextFactory.getAnonymousContext(), vci.getBucketIdentifier(), vci.getFlowIdentifier()).orElse(null); vci.setBucketName(versionedFlow.getBucketName()); vci.setFlowName(versionedFlow.getName()); vci.setFlowDescription(versionedFlow.getDescription()); vci.setRegistryName(flowRegistry.getName()); - if (latestVersion == vci.getVersion()) { + if (Objects.equals(latestVersion, vci.getVersion())) { versionControlFields.setStale(false); - if (latestVersion == 0) { + if (latestVersion == null) { LOG.debug("{} does not have any version in the Registry", this); versionControlFields.setLocallyModified(true); } else { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/FlowAnalyzingRegistryClientNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/FlowAnalyzingRegistryClientNode.java index 8ce394277808b..3793f60a19961 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/FlowAnalyzingRegistryClientNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/FlowAnalyzingRegistryClientNode.java @@ -93,14 +93,15 @@ public RegisteredFlowSnapshot registerFlowSnapshot( final Map parameterContexts, final Map parameterProviderReferences, final String comments, - final int expectedVersion + final String expectedVersion, + final RegisterAction registerAction ) throws FlowRegistryException, IOException { if (LOGGER.isTraceEnabled()) { LOGGER.trace("Snapshot for flow {} is checked for violations before commit", snapshot.getInstanceIdentifier()); } if (analyzeProcessGroupToRegister(snapshot)) { - return node.registerFlowSnapshot(context, flow, snapshot, externalControllerServices, parameterContexts, parameterProviderReferences, comments, expectedVersion); + return node.registerFlowSnapshot(context, flow, snapshot, externalControllerServices, parameterContexts, parameterProviderReferences, comments, expectedVersion, registerAction); } else { throw new FlowRegistryPreCommitException("There are unresolved rule violations"); } @@ -431,7 +432,7 @@ public Set getFlows(final FlowRegistryClientUserContext context, @Override public FlowSnapshotContainer getFlowContents( - final FlowRegistryClientUserContext context, final String bucketId, final String flowId, final int version, final boolean fetchRemoteFlows + final FlowRegistryClientUserContext context, final String bucketId, final String flowId, final String version, final boolean fetchRemoteFlows ) throws FlowRegistryException, IOException { return node.getFlowContents(context, bucketId, flowId, version, fetchRemoteFlows); } @@ -442,7 +443,7 @@ public Set getFlowVersions(final FlowRegistryCli } @Override - public int getLatestVersion(final FlowRegistryClientUserContext context, final String bucketId, final String flowId) throws FlowRegistryException, IOException { + public Optional getLatestVersion(final FlowRegistryClientUserContext context, final String bucketId, final String flowId) throws FlowRegistryException, IOException { return node.getLatestVersion(context, bucketId, flowId); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClientNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClientNode.java index 769b7b4434779..8d46606349166 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClientNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClientNode.java @@ -223,7 +223,7 @@ public Set getFlows(final FlowRegistryClientUserContext context, @Override public FlowSnapshotContainer getFlowContents( - final FlowRegistryClientUserContext context, final String bucketId, final String flowId, final int version, final boolean fetchRemoteFlows + final FlowRegistryClientUserContext context, final String bucketId, final String flowId, final String version, final boolean fetchRemoteFlows ) throws FlowRegistryException, IOException { final RegisteredFlowSnapshot flowSnapshot = execute(() -> client.get().getComponent().getFlowContents(getConfigurationContext(context), bucketId, flowId, version)); @@ -252,11 +252,12 @@ public RegisteredFlowSnapshot registerFlowSnapshot( final Map parameterContexts, final Map parameterProviderReferences, final String comments, - final int expectedVersion) throws FlowRegistryException, IOException { + final String expectedVersion, + final RegisterAction registerAction) throws FlowRegistryException, IOException { final RegisteredFlowSnapshot registeredFlowSnapshot = createRegisteredFlowSnapshot( context, flow, snapshot, externalControllerServices, parameterContexts, parameterProviderReferences, comments, expectedVersion); - return execute(() -> client.get().getComponent().registerFlowSnapshot(getConfigurationContext(context), registeredFlowSnapshot)); + return execute(() -> client.get().getComponent().registerFlowSnapshot(getConfigurationContext(context), registeredFlowSnapshot, registerAction)); } @Override @@ -265,7 +266,7 @@ public Set getFlowVersions(final FlowRegistryCli } @Override - public int getLatestVersion(final FlowRegistryClientUserContext context, final String bucketId, final String flowId) throws FlowRegistryException, IOException { + public Optional getLatestVersion(final FlowRegistryClientUserContext context, final String bucketId, final String flowId) throws FlowRegistryException, IOException { return execute(() -> client.get().getComponent().getLatestVersion(getConfigurationContext(context), bucketId, flowId)); } @@ -370,7 +371,7 @@ private RegisteredFlowSnapshot fetchFlowContents(final FlowRegistryClientUserCon final String storageLocation = coordinates.getStorageLocation(); final String bucketId = coordinates.getBucketId(); final String flowId = coordinates.getFlowId(); - final int version = coordinates.getVersion(); + final String version = coordinates.getVersion(); final List clientNodes = getRegistryClientsForInternalFlow(storageLocation); for (final FlowRegistryClientNode clientNode : clientNodes) { @@ -406,7 +407,7 @@ private RegisteredFlowSnapshot createRegisteredFlowSnapshot( final Map parameterContexts, final Map parameterProviderReferences, final String comments, - final int expectedVersion) { + final String expectedVersion) { final RegisteredFlowSnapshotMetadata metadata = new RegisteredFlowSnapshotMetadata(); metadata.setBucketIdentifier(flow.getBucketIdentifier()); metadata.setFlowIdentifier(flow.getIdentifier()); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/StandardVersionControlInformation.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/StandardVersionControlInformation.java index 086895136e166..6eaae7eec9568 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/StandardVersionControlInformation.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/StandardVersionControlInformation.java @@ -32,7 +32,7 @@ public class StandardVersionControlInformation implements VersionControlInformat private volatile String flowName; private volatile String flowDescription; private volatile String storageLocation; - private final int version; + private final String version; private volatile VersionedProcessGroup flowSnapshot; private final VersionedFlowStatus status; @@ -45,7 +45,7 @@ public static class Builder { private String flowName; private String flowDescription; private String storageLocation; - private int version; + private String version; private VersionedProcessGroup flowSnapshot; private VersionedFlowStatus status; @@ -89,7 +89,7 @@ public Builder storageLocation(String storageLocation) { return this; } - public Builder version(int version) { + public Builder version(String version) { this.version = version; return this; } @@ -149,7 +149,7 @@ public StandardVersionControlInformation build() { } - public StandardVersionControlInformation(final String registryId, final String registryName, final String bucketId, final String flowId, final int version, + public StandardVersionControlInformation(final String registryId, final String registryName, final String bucketId, final String flowId, final String version, final String storageLocation, final VersionedProcessGroup snapshot, final VersionedFlowStatus status) { this.registryIdentifier = registryId; this.registryName = registryName; @@ -223,7 +223,7 @@ public void setStorageLocation(String storageLocation) { } @Override - public int getVersion() { + public String getVersion() { return version; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/registry/flow/FlowAnalyzingRegistryClientNodeTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/registry/flow/FlowAnalyzingRegistryClientNodeTest.java index 9ff0991dea590..191d93941ee82 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/registry/flow/FlowAnalyzingRegistryClientNodeTest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/registry/flow/FlowAnalyzingRegistryClientNodeTest.java @@ -47,7 +47,7 @@ class FlowAnalyzingRegistryClientNodeTest { private final static String INSTANCE_IDENTIFIER = UUID.randomUUID().toString(); private final static String COMMENT_TEXT = "comment"; - private final static int EXPECTED_VERSION = 3; + private final static String EXPECTED_VERSION = "3"; @Mock FlowRegistryClientNode node; @@ -103,11 +103,13 @@ public void allowFlowRegistrationWhenNoEnforcingViolationFound() throws IOExcept Mockito.when(ruleViolationsManager.getRuleViolationsForGroup(Mockito.anyString())).thenReturn(Collections.emptyList()); final FlowAnalyzingRegistryClientNode testSubject = new FlowAnalyzingRegistryClientNode(node, serviceProvider, flowAnalyzer, ruleViolationsManager, flowManager, flowMapper); - testSubject.registerFlowSnapshot(context, flow, versionedProcessGroup, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), COMMENT_TEXT, EXPECTED_VERSION); + testSubject.registerFlowSnapshot(context, flow, versionedProcessGroup, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), + COMMENT_TEXT, EXPECTED_VERSION, RegisterAction.COMMIT); Mockito .verify(node, Mockito.only()) - .registerFlowSnapshot(context, flow, versionedProcessGroup, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), COMMENT_TEXT, EXPECTED_VERSION); + .registerFlowSnapshot(context, flow, versionedProcessGroup, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), + COMMENT_TEXT, EXPECTED_VERSION, RegisterAction.COMMIT); } @Test @@ -115,11 +117,13 @@ public void allowFlowRegistrationWhenWarningViolationFound() throws IOException, Mockito.when(ruleViolationsManager.getRuleViolationsForGroup(Mockito.anyString())).thenReturn(Collections.singletonList(ruleViolation3)); final FlowAnalyzingRegistryClientNode testSubject = new FlowAnalyzingRegistryClientNode(node, serviceProvider, flowAnalyzer, ruleViolationsManager, flowManager, flowMapper); - testSubject.registerFlowSnapshot(context, flow, versionedProcessGroup, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), COMMENT_TEXT, EXPECTED_VERSION); + testSubject.registerFlowSnapshot(context, flow, versionedProcessGroup, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), + COMMENT_TEXT, EXPECTED_VERSION, RegisterAction.COMMIT); Mockito .verify(node, Mockito.only()) - .registerFlowSnapshot(context, flow, versionedProcessGroup, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), COMMENT_TEXT, EXPECTED_VERSION); + .registerFlowSnapshot(context, flow, versionedProcessGroup, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), + COMMENT_TEXT, EXPECTED_VERSION, RegisterAction.COMMIT); } @Test @@ -129,11 +133,13 @@ public void preventFlowRegistrationWhenEnforcingViolationFound() throws IOExcept Assertions.assertThrows( FlowRegistryPreCommitException.class, - () -> testSubject.registerFlowSnapshot(context, flow, versionedProcessGroup, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), COMMENT_TEXT, EXPECTED_VERSION) + () -> testSubject.registerFlowSnapshot(context, flow, versionedProcessGroup, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), + COMMENT_TEXT, EXPECTED_VERSION, RegisterAction.COMMIT) ); Mockito .verify(node, Mockito.never()) - .registerFlowSnapshot(context, flow, versionedProcessGroup, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), COMMENT_TEXT, EXPECTED_VERSION); + .registerFlowSnapshot(context, flow, versionedProcessGroup, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), + COMMENT_TEXT, EXPECTED_VERSION, RegisterAction.COMMIT); } } \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClientNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClientNode.java index 8fe50f2d4e1ee..179f6210cd816 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClientNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClientNode.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.util.Map; +import java.util.Optional; import java.util.Set; public interface FlowRegistryClientNode extends ComponentNode { @@ -44,7 +45,7 @@ public interface FlowRegistryClientNode extends ComponentNode { RegisteredFlow getFlow(FlowRegistryClientUserContext context, String bucketId, String flowId) throws FlowRegistryException, IOException; Set getFlows(FlowRegistryClientUserContext context, String bucketId) throws FlowRegistryException, IOException; - FlowSnapshotContainer getFlowContents(FlowRegistryClientUserContext context, String bucketId, String flowId, int version, boolean fetchRemoteFlows) throws FlowRegistryException, IOException; + FlowSnapshotContainer getFlowContents(FlowRegistryClientUserContext context, String bucketId, String flowId, String version, boolean fetchRemoteFlows) throws FlowRegistryException, IOException; RegisteredFlowSnapshot registerFlowSnapshot( FlowRegistryClientUserContext context, RegisteredFlow flow, @@ -52,11 +53,11 @@ RegisteredFlowSnapshot registerFlowSnapshot( Map externalControllerServices, Map parameterContexts, Map parameterProviderReferences, String comments, - int expectedVersion + String expectedVersion, RegisterAction registerAction ) throws FlowRegistryException, IOException; Set getFlowVersions(FlowRegistryClientUserContext context, String bucketId, String flowId) throws FlowRegistryException, IOException; - int getLatestVersion(FlowRegistryClientUserContext context, String bucketId, String flowId) throws FlowRegistryException, IOException; + Optional getLatestVersion(FlowRegistryClientUserContext context, String bucketId, String flowId) throws FlowRegistryException, IOException; void setComponent(LoggableComponent component); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/GhostFlowRegistryClient.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/GhostFlowRegistryClient.java index 93983d992eab7..c799dfc86a256 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/GhostFlowRegistryClient.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/GhostFlowRegistryClient.java @@ -23,6 +23,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.Set; public class GhostFlowRegistryClient implements FlowRegistryClient { @@ -112,12 +113,14 @@ public Set getFlows(final FlowRegistryClientConfigurationContext } @Override - public RegisteredFlowSnapshot getFlowContents(final FlowRegistryClientConfigurationContext context, final String bucketId, final String flowId, int version) throws FlowRegistryException { + public RegisteredFlowSnapshot getFlowContents(final FlowRegistryClientConfigurationContext context, final String bucketId, final String flowId, final String version) + throws FlowRegistryException { throw new FlowRegistryException(ERROR_MESSAGE); } @Override - public RegisteredFlowSnapshot registerFlowSnapshot(final FlowRegistryClientConfigurationContext context, final RegisteredFlowSnapshot flowSnapshot) throws FlowRegistryException { + public RegisteredFlowSnapshot registerFlowSnapshot(final FlowRegistryClientConfigurationContext context, final RegisteredFlowSnapshot flowSnapshot, final RegisterAction action) + throws FlowRegistryException { throw new FlowRegistryException(ERROR_MESSAGE); } @@ -127,7 +130,7 @@ public Set getFlowVersions(final FlowRegistryCli } @Override - public int getLatestVersion(final FlowRegistryClientConfigurationContext context, final String bucketId, final String flowId) throws FlowRegistryException { + public Optional getLatestVersion(final FlowRegistryClientConfigurationContext context, final String bucketId, final String flowId) throws FlowRegistryException { throw new FlowRegistryException(ERROR_MESSAGE); } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionControlInformation.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionControlInformation.java index 8b68fbf7034aa..d038fbe34b12b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionControlInformation.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionControlInformation.java @@ -70,7 +70,7 @@ public interface VersionControlInformation { /** * @return the version of the flow in the Flow Registry that this flow is based on. */ - int getVersion(); + String getVersion(); /** * @return the current status of the Process Group as it relates to the associated Versioned Flow. diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapperTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapperTest.java index a4d1dda842663..a7cbc92aa8d61 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapperTest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapperTest.java @@ -595,7 +595,7 @@ private VersionControlInformation prepareVersionControlInfo() { when(versionControlInformation.getRegistryIdentifier()).thenReturn(UUID.randomUUID().toString()); when(versionControlInformation.getBucketIdentifier()).thenReturn(UUID.randomUUID().toString()); when(versionControlInformation.getFlowIdentifier()).thenReturn(UUID.randomUUID().toString()); - when(versionControlInformation.getVersion()).thenReturn(counter++); + when(versionControlInformation.getVersion()).thenReturn(String.valueOf(counter++)); when(versionControlInformation.getStorageLocation()).thenReturn("http://localhost:18080"); return versionControlInformation; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java index 282f2a8486c1f..424e1a6cb3fac 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java @@ -36,6 +36,7 @@ import org.apache.nifi.parameter.ParameterContext; import org.apache.nifi.parameter.ParameterGroupConfiguration; import org.apache.nifi.registry.flow.FlowSnapshotContainer; +import org.apache.nifi.registry.flow.RegisterAction; import org.apache.nifi.registry.flow.RegisteredFlow; import org.apache.nifi.registry.flow.RegisteredFlowSnapshot; import org.apache.nifi.web.api.dto.AccessPolicyDTO; @@ -1516,8 +1517,8 @@ Set getControllerServiceTypes(final String serviceType, final RegisteredFlowSnapshot registerVersionedFlowSnapshot(String registryId, RegisteredFlow flow, VersionedProcessGroup snapshot, Map parameterContexts, Map parameterProviderReferences, - Map externalControllerServiceReferences, - String comments, int expectedVersion); + Map externalControllerServiceReferences, + String comments, String expectedVersion, RegisterAction registerAction); /** * Updates the Version Control Information on the Process Group with the given ID diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index 6d62c62208498..865a93e974d3b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -150,6 +150,7 @@ import org.apache.nifi.registry.flow.FlowRegistryPermissions; import org.apache.nifi.registry.flow.FlowRegistryUtil; import org.apache.nifi.registry.flow.FlowSnapshotContainer; +import org.apache.nifi.registry.flow.RegisterAction; import org.apache.nifi.registry.flow.RegisteredFlow; import org.apache.nifi.registry.flow.RegisteredFlowSnapshot; import org.apache.nifi.registry.flow.RegisteredFlowSnapshotMetadata; @@ -4975,14 +4976,10 @@ public VersionControlComponentMappingEntity registerFlowWithFlowRegistry(final S final VersionedFlowDTO versionedFlowDto = requestEntity.getVersionedFlow(); final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId); + final VersionControlInformation currentVci = processGroup.getVersionControlInformation(); - int snapshotVersion; - if (VersionedFlowDTO.FORCE_COMMIT_ACTION.equals(versionedFlowDto.getAction())) { - snapshotVersion = -1; - } else { - final VersionControlInformation currentVci = processGroup.getVersionControlInformation(); - snapshotVersion = currentVci == null ? 1 : currentVci.getVersion() + 1; - } + final String snapshotVersion = currentVci == null ? null : currentVci.getVersion(); + final RegisterAction registerAction = RegisterAction.valueOf(versionedFlowDto.getAction()); // Create a VersionedProcessGroup snapshot of the flow as it is currently. final InstantiatedVersionedProcessGroup versionedProcessGroup = createFlowSnapshot(groupId); @@ -5021,7 +5018,7 @@ public VersionControlComponentMappingEntity registerFlowWithFlowRegistry(final S try { // add a snapshot to the flow in the registry registeredSnapshot = registerVersionedFlowSnapshot(registryId, registeredFlow, versionedProcessGroup, parameterContexts, parameterProviderReferences, - versionedProcessGroup.getExternalControllerServiceReferences(), versionedFlowDto.getComments(), snapshotVersion); + versionedProcessGroup.getExternalControllerServiceReferences(), versionedFlowDto.getComments(), snapshotVersion, registerAction); } catch (final NiFiCoreException e) { // If the flow has been created, but failed to add a snapshot, // then we need to capture the created versioned flow information as a partial successful result. @@ -5177,7 +5174,7 @@ public FlowSnapshotContainer getVersionedFlowSnapshot(final VersionControlInform * @return a VersionedFlowSnapshot from a registry with the given version */ private FlowSnapshotContainer getVersionedFlowSnapshot(final String registryId, final String bucketId, final String flowId, - final Integer flowVersion, final boolean fetchRemoteFlows) { + final String flowVersion, final boolean fetchRemoteFlows) { final FlowRegistryClientNode flowRegistry = flowRegistryDAO.getFlowRegistryClient(registryId); if (flowRegistry == null) { throw new ResourceNotFoundException("Could not find any Flow Registry registered with identifier " + registryId); @@ -5326,8 +5323,8 @@ private RegisteredFlow getVersionedFlow(final String registryId, final String bu public RegisteredFlowSnapshot registerVersionedFlowSnapshot(final String registryId, final RegisteredFlow flow, final VersionedProcessGroup snapshot, final Map parameterContexts, final Map parameterProviderReferences, - final Map externalControllerServiceReferences, final String comments, - final int expectedVersion) { + final Map externalControllerServiceReferences, final String comments, + final String expectedVersion, final RegisterAction registerAction) { final FlowRegistryClientNode registry = flowRegistryDAO.getFlowRegistryClient(registryId); if (registry == null) { throw new ResourceNotFoundException("No Flow Registry exists with ID " + registryId); @@ -5336,7 +5333,7 @@ public RegisteredFlowSnapshot registerVersionedFlowSnapshot(final String registr try { return registry.registerFlowSnapshot(FlowRegistryClientContextFactory.getContextForUser( NiFiUserUtils.getNiFiUser()), flow, snapshot, externalControllerServiceReferences, parameterContexts, - parameterProviderReferences, comments, expectedVersion); + parameterProviderReferences, comments, expectedVersion, registerAction); } catch (final IOException | FlowRegistryException e) { throw new NiFiCoreException("Failed to register flow with Flow Registry due to " + e.getMessage(), e); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java index 33934aa267f29..305d95b634fd1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java @@ -178,7 +178,7 @@ public Response exportFlowVersion(@Parameter(description = "The process group id final VersionedProcessGroup versionedProcessGroup = versionedFlowSnapshot.getFlowContents(); final String flowName = versionedProcessGroup.getName(); - final int flowVersion = versionedFlowSnapshot.getSnapshotMetadata().getVersion(); + final String flowVersion = versionedFlowSnapshot.getSnapshotMetadata().getVersion(); // clear top-level registry data which doesn't belong in versioned flow download versionedFlowSnapshot.setFlow(null); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFlowRegistryDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFlowRegistryDAO.java index a9a24bffb1a7a..0341f6f7ccdc5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFlowRegistryDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFlowRegistryDAO.java @@ -33,6 +33,7 @@ import java.io.IOException; import java.util.Collections; +import java.util.Comparator; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -162,7 +163,7 @@ public Set getFlowVersionsForUser(final FlowRegi } final Set flowVersions = flowRegistry.getFlowVersions(context, bucketId, flowId); - final Set sortedFlowVersions = new TreeSet<>((f1, f2) -> Integer.compare(f1.getVersion(), f2.getVersion())); + final Set sortedFlowVersions = new TreeSet<>(Comparator.comparingLong(RegisteredFlowSnapshotMetadata::getTimestamp)); sortedFlowVersions.addAll(flowVersions); return sortedFlowVersions; } catch (final IOException | FlowRegistryException ioe) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestVersionsResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestVersionsResource.java index 026c6dc46a4b3..a033c8635bf92 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestVersionsResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestVersionsResource.java @@ -53,7 +53,7 @@ public void testExportFlowVersion() { when(serviceFacade.getVersionedFlowSnapshotByGroupId(groupId)).thenReturn(snapshotContainer); final String flowName = "flowname"; - final int flowVersion = 1; + final String flowVersion = "1"; final VersionedProcessGroup versionedProcessGroup = mock(VersionedProcessGroup.class); final RegisteredFlowSnapshotMetadata snapshotMetadata = mock(RegisteredFlowSnapshotMetadata.class); when(versionedFlowSnapshot.getFlowContents()).thenReturn(versionedProcessGroup); diff --git a/nifi-registry/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/VersionedFlowConverter.java b/nifi-registry/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/VersionedFlowConverter.java index 9097d58781dc2..46dbba619bf5e 100644 --- a/nifi-registry/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/VersionedFlowConverter.java +++ b/nifi-registry/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/VersionedFlowConverter.java @@ -33,7 +33,7 @@ public static VersionedExternalFlow createVersionedExternalFlow(final VersionedF externalFlowMetadata.setComments(snapshotMetadata.getComments()); externalFlowMetadata.setFlowIdentifier(snapshotMetadata.getFlowIdentifier()); externalFlowMetadata.setTimestamp(snapshotMetadata.getTimestamp()); - externalFlowMetadata.setVersion(snapshotMetadata.getVersion()); + externalFlowMetadata.setVersion(String.valueOf(snapshotMetadata.getVersion())); } final VersionedFlow versionedFlow = flowSnapshot.getFlow(); diff --git a/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/ConciseEvolvingDifferenceDescriptor.java b/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/ConciseEvolvingDifferenceDescriptor.java index 57d4fe1e36a8e..2e6cf79a7ef34 100644 --- a/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/ConciseEvolvingDifferenceDescriptor.java +++ b/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/ConciseEvolvingDifferenceDescriptor.java @@ -78,7 +78,7 @@ public String describeDifference(final DifferenceType type, final String flowANa // If the two vary only by version, then use a more concise message. If anything else is different, then use a fully explanation. if (Objects.equals(coordinatesA.getStorageLocation(), coordinatesB.getStorageLocation()) && Objects.equals(coordinatesA.getBucketId(), coordinatesB.getBucketId()) - && Objects.equals(coordinatesA.getFlowId(), coordinatesB.getFlowId()) && coordinatesA.getVersion() != coordinatesB.getVersion()) { + && Objects.equals(coordinatesA.getFlowId(), coordinatesB.getFlowId()) && !Objects.equals(coordinatesA.getVersion(), coordinatesB.getVersion())) { description = String.format("Flow Version changed from %s to %s", coordinatesA.getVersion(), coordinatesB.getVersion()); break; diff --git a/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/StaticDifferenceDescriptor.java b/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/StaticDifferenceDescriptor.java index c37519a6380a0..1c82a2b17ac3b 100644 --- a/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/StaticDifferenceDescriptor.java +++ b/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/StaticDifferenceDescriptor.java @@ -72,7 +72,7 @@ public String describeDifference(final DifferenceType type, final String flowANa // If the two vary only by version, then use a more concise message. If anything else is different, then use a fully explanation. if (Objects.equals(coordinatesA.getStorageLocation(), coordinatesB.getStorageLocation()) && Objects.equals(coordinatesA.getBucketId(), coordinatesB.getBucketId()) - && Objects.equals(coordinatesA.getFlowId(), coordinatesB.getFlowId()) && coordinatesA.getVersion() != coordinatesB.getVersion()) { + && Objects.equals(coordinatesA.getFlowId(), coordinatesB.getFlowId()) && !Objects.equals(coordinatesA.getVersion(), coordinatesB.getVersion())) { description = String.format("Flow Version is %s in %s but %s in %s", coordinatesA.getVersion(), flowAName, coordinatesB.getVersion(), flowBName); break; diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/registry/flow/InMemoryFlowRegistry.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/registry/flow/InMemoryFlowRegistry.java index 3cb2bf081e1ab..3ea57d79a5be6 100644 --- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/registry/flow/InMemoryFlowRegistry.java +++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/registry/flow/InMemoryFlowRegistry.java @@ -24,6 +24,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -75,7 +76,7 @@ public Set getFlows(FlowRegistryClientConfigurationContext conte } @Override - public RegisteredFlowSnapshot registerFlowSnapshot(FlowRegistryClientConfigurationContext context, RegisteredFlowSnapshot flowSnapshot) { + public RegisteredFlowSnapshot registerFlowSnapshot(FlowRegistryClientConfigurationContext context, RegisteredFlowSnapshot flowSnapshot, RegisterAction registerAction) { throw new UnsupportedOperationException(USER_SPECIFIC_ACTIONS_NOT_SUPPORTED); } @@ -95,7 +96,8 @@ public RegisteredFlow getFlow(FlowRegistryClientConfigurationContext context, St } @Override - public RegisteredFlowSnapshot getFlowContents(final FlowRegistryClientConfigurationContext context, final String bucketId, final String flowId, final int version) throws FlowRegistryException { + public RegisteredFlowSnapshot getFlowContents(final FlowRegistryClientConfigurationContext context, final String bucketId, final String flowId, final String version) + throws FlowRegistryException { if (context.getNiFiUserIdentity().isPresent()) { throw new UnsupportedOperationException(USER_SPECIFIC_ACTIONS_NOT_SUPPORTED); } @@ -104,7 +106,7 @@ public RegisteredFlowSnapshot getFlowContents(final FlowRegistryClientConfigurat final List snapshots = flowSnapshots.get(flowCoordinates); final VersionedExternalFlow registeredFlowSnapshot = snapshots.stream() - .filter(snapshot -> snapshot.getMetadata().getVersion() == version) + .filter(snapshot -> Objects.equals(snapshot.getMetadata().getVersion(), version)) .findAny() .orElseThrow(() -> new FlowRegistryException("Could not find flow: bucketId=" + bucketId + ", flowId=" + flowId + ", version=" + version)); @@ -138,11 +140,11 @@ public synchronized void addFlowSnapshot(final VersionedExternalFlow versionedEx final VersionedExternalFlowMetadata metadata = versionedExternalFlow.getMetadata(); final String bucketId; final String flowId; - final int version; + final String version; if (metadata == null) { bucketId = DEFAULT_BUCKET_ID; flowId = "flow-" + flowIdGenerator.getAndIncrement(); - version = 1; + version = "1"; } else { bucketId = metadata.getBucketIdentifier(); flowId = metadata.getFlowIdentifier(); @@ -169,7 +171,7 @@ public Set getFlowVersions(FlowRegistryClientCon } @Override - public int getLatestVersion(FlowRegistryClientConfigurationContext context, String bucketId, String flowId) { + public Optional getLatestVersion(FlowRegistryClientConfigurationContext context, String bucketId, String flowId) { throw new UnsupportedOperationException(USER_SPECIFIC_ACTIONS_NOT_SUPPORTED); } diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/core/RegistryUtil.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/core/RegistryUtil.java index 80956bc3d84fd..dce02af651ddf 100644 --- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/core/RegistryUtil.java +++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/core/RegistryUtil.java @@ -148,7 +148,7 @@ private void populateVersionedContentsRecursively(final VersionedProcessGroup gr final String subRegistryUrl = getBaseRegistryUrl(coordinates.getStorageLocation()); final String bucketId = coordinates.getBucketId(); final String flowId = coordinates.getFlowId(); - final int version = coordinates.getVersion(); + final int version = Integer.parseInt(coordinates.getVersion()); final RegistryUtil subFlowUtil = getSubRegistryUtil(subRegistryUrl); final VersionedFlowSnapshot snapshot = subFlowUtil.getFlowByID(bucketId, flowId, version); diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/core/TestRegistryUtil.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/core/TestRegistryUtil.java index ceb5ddaa301b3..00c115a97f944 100644 --- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/core/TestRegistryUtil.java +++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/core/TestRegistryUtil.java @@ -79,7 +79,7 @@ private VersionedFlowSnapshot buildRootSnapshot(final Set coordinates.setStorageLocation(storageLocation); coordinates.setBucketId(ROOT_BUCKET_ID); coordinates.setFlowId(ROOT_FLOW_ID); - coordinates.setVersion(ROOT_VERSION); + coordinates.setVersion(String.valueOf(ROOT_VERSION)); final VersionedProcessGroup group = new VersionedProcessGroup(); group.setVersionedFlowCoordinates(coordinates); @@ -96,7 +96,7 @@ private VersionedProcessGroup getChildVersionedProcessGroup() { coordinates.setStorageLocation(storageLocation); coordinates.setBucketId(CHILD_BUCKET_ID); coordinates.setFlowId(CHILD_FLOW_ID); - coordinates.setVersion(CHILD_VERSION); + coordinates.setVersion(String.valueOf(CHILD_VERSION)); final VersionedProcessGroup group = new VersionedProcessGroup(); group.setVersionedFlowCoordinates(coordinates); diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/flow/registry/FileSystemFlowRegistryClient.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/flow/registry/FileSystemFlowRegistryClient.java index e7ec904e753a9..df6af904fbbe1 100644 --- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/flow/registry/FileSystemFlowRegistryClient.java +++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/flow/registry/FileSystemFlowRegistryClient.java @@ -30,6 +30,7 @@ import org.apache.nifi.registry.flow.FlowRegistryBucket; import org.apache.nifi.registry.flow.FlowRegistryClientConfigurationContext; import org.apache.nifi.registry.flow.FlowRegistryPermissions; +import org.apache.nifi.registry.flow.RegisterAction; import org.apache.nifi.registry.flow.RegisteredFlow; import org.apache.nifi.registry.flow.RegisteredFlowSnapshot; import org.apache.nifi.registry.flow.RegisteredFlowSnapshotMetadata; @@ -209,7 +210,7 @@ public Set getFlows(final FlowRegistryClientConfigurationContext } @Override - public RegisteredFlowSnapshot getFlowContents(final FlowRegistryClientConfigurationContext context, final String bucketId, final String flowId, final int version) throws IOException { + public RegisteredFlowSnapshot getFlowContents(final FlowRegistryClientConfigurationContext context, final String bucketId, final String flowId, final String version) throws IOException { final File flowDir = getFlowDirectory(context, bucketId, flowId); final Pattern intPattern = Pattern.compile("\\d+"); final File[] versionFiles = flowDir.listFiles(file -> intPattern.matcher(file.getName()).matches()); @@ -241,7 +242,7 @@ private void populateBucket(final RegisteredFlowSnapshot snapshot, final String snapshot.getSnapshotMetadata().setBucketIdentifier(bucketId); } - private void populateFlow(final RegisteredFlowSnapshot snapshot, final String bucketId, final String flowId, final int version, final int numVersions) { + private void populateFlow(final RegisteredFlowSnapshot snapshot, final String bucketId, final String flowId, final String version, final int numVersions) { final RegisteredFlow existingFlow = snapshot.getFlow(); if (existingFlow != null) { return; @@ -258,7 +259,7 @@ private void populateFlow(final RegisteredFlowSnapshot snapshot, final String bu flow.setVersionCount(numVersions); final RegisteredFlowVersionInfo versionInfo = new RegisteredFlowVersionInfo(); - versionInfo.setVersion(version); + versionInfo.setVersion(numVersions); flow.setVersionInfo(versionInfo); snapshot.setFlow(flow); @@ -266,15 +267,18 @@ private void populateFlow(final RegisteredFlowSnapshot snapshot, final String bu } @Override - public RegisteredFlowSnapshot registerFlowSnapshot(final FlowRegistryClientConfigurationContext context, final RegisteredFlowSnapshot flowSnapshot) throws IOException { + public RegisteredFlowSnapshot registerFlowSnapshot(final FlowRegistryClientConfigurationContext context, final RegisteredFlowSnapshot flowSnapshot, final RegisterAction registerAction) + throws IOException { final RegisteredFlowSnapshotMetadata metadata = flowSnapshot.getSnapshotMetadata(); final String bucketId = metadata.getBucketIdentifier(); final String flowId = metadata.getFlowIdentifier(); final File flowDir = getFlowDirectory(context, bucketId, flowId); - final long version = metadata.getVersion(); - final File versionDir = getChildLocation(flowDir, Paths.get(String.valueOf(version))); + final int latestVersion = getLatestFlowVersionInt(context, bucketId, flowId); + final String version = latestVersion == -1 ? "1" : String.valueOf(latestVersion + 1); + flowSnapshot.getSnapshotMetadata().setVersion(version); // Create the directory for the version, if it doesn't exist. + final File versionDir = getChildLocation(flowDir, Paths.get(version)); if (!versionDir.exists()) { Files.createDirectories(versionDir.toPath()); } @@ -364,7 +368,7 @@ public Set getFlowVersions(final FlowRegistryCli final String versionName = versionDir.getName(); final RegisteredFlowSnapshotMetadata metadata = new RegisteredFlowSnapshotMetadata(); - metadata.setVersion(Integer.parseInt(versionName)); + metadata.setVersion(versionName); metadata.setTimestamp(versionDir.lastModified()); metadata.setFlowIdentifier(flowId); metadata.setBucketIdentifier(bucketId); @@ -376,7 +380,12 @@ public Set getFlowVersions(final FlowRegistryCli } @Override - public int getLatestVersion(final FlowRegistryClientConfigurationContext context, final String bucketId, final String flowId) throws IOException { + public Optional getLatestVersion(final FlowRegistryClientConfigurationContext context, final String bucketId, final String flowId) throws IOException { + final int latestVersion = getLatestFlowVersionInt(context, bucketId, flowId); + return latestVersion == -1 ? Optional.empty() : Optional.of(String.valueOf(latestVersion)); + } + + private int getLatestFlowVersionInt(final FlowRegistryClientConfigurationContext context, final String bucketId, final String flowId) throws IOException { final File flowDir = getFlowDirectory(context, bucketId, flowId); final File[] versionDirs = flowDir.listFiles(); if (versionDirs == null) { @@ -384,13 +393,13 @@ public int getLatestVersion(final FlowRegistryClientConfigurationContext context } final OptionalInt greatestValue = Arrays.stream(versionDirs) - .map(File::getName) - .mapToInt(Integer::parseInt) - .max(); + .map(File::getName) + .mapToInt(Integer::parseInt) + .max(); return greatestValue.orElse(-1); } - private File getSnapshotFile(final FlowRegistryClientConfigurationContext context, final String bucketId, final String flowId, final long version) { + private File getSnapshotFile(final FlowRegistryClientConfigurationContext context, final String bucketId, final String flowId, final String version) { final File flowDirectory = getFlowDirectory(context, bucketId, flowId); final File versionDirectory = getChildLocation(flowDirectory, Paths.get(String.valueOf(version))); return new File(versionDirectory, "snapshot.json"); diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java index aa3469ce6c2ed..1038611d61013 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java @@ -1794,7 +1794,7 @@ public ProcessGroupEntity importFlowFromRegistry(final String parentGroupId, fin return importFlowFromRegistry(parentGroupId, vciDto.getRegistryId(), vciDto.getBucketId(), vciDto.getFlowId(), vciDto.getVersion()); } - public ProcessGroupEntity importFlowFromRegistry(final String parentGroupId, final String registryClientId, final String bucketId, final String flowId, final int version) + public ProcessGroupEntity importFlowFromRegistry(final String parentGroupId, final String registryClientId, final String bucketId, final String flowId, final String version) throws NiFiClientException, IOException { final VersionControlInformationDTO vci = new VersionControlInformationDTO(); @@ -1813,11 +1813,11 @@ public ProcessGroupEntity importFlowFromRegistry(final String parentGroupId, fin return nifiClient.getProcessGroupClient().createProcessGroup(parentGroupId, groupEntity); } - public VersionedFlowUpdateRequestEntity changeFlowVersion(final String processGroupId, final int version) throws NiFiClientException, IOException, InterruptedException { + public VersionedFlowUpdateRequestEntity changeFlowVersion(final String processGroupId, final String version) throws NiFiClientException, IOException, InterruptedException { return changeFlowVersion(processGroupId, version, true); } - public VersionedFlowUpdateRequestEntity changeFlowVersion(final String processGroupId, final int version, final boolean throwOnFailure) + public VersionedFlowUpdateRequestEntity changeFlowVersion(final String processGroupId, final String version, final boolean throwOnFailure) throws NiFiClientException, IOException, InterruptedException { logger.info("Submitting Change Flow Version request to change Group with ID {} to Version {}", processGroupId, version); diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/registry/RegistryClientIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/registry/RegistryClientIT.java index 36511c6f06214..0a9a7ee248b54 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/registry/RegistryClientIT.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/registry/RegistryClientIT.java @@ -48,7 +48,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.stream.Collectors; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; @@ -103,7 +102,7 @@ public void testChangeVersionOnParentThatCascadesToChild() throws NiFiClientExce util.assertFlowUpToDate(child.getId()); // Verify that we are able to switch back to v1 while everything is stopped - util.changeFlowVersion(parent.getId(), 1); + util.changeFlowVersion(parent.getId(), "1"); util.assertFlowStaleAndUnmodified(parent.getId()); util.assertFlowStaleAndUnmodified(child.getId()); @@ -119,7 +118,7 @@ public void testChangeVersionOnParentThatCascadesToChild() throws NiFiClientExce assertEquals("Updated", contents); // Switch Version back to v2 while it's running - util.changeFlowVersion(parent.getId(), 2); + util.changeFlowVersion(parent.getId(), "2"); util.assertFlowUpToDate(parent.getId()); util.assertFlowUpToDate(child.getId()); @@ -133,7 +132,7 @@ public void testChangeVersionOnParentThatCascadesToChild() throws NiFiClientExce assertEquals("Updated v2", secondFlowFileContents); // Switch back to v1 while flow is running to verify that the version can change back to a lower version as well - util.changeFlowVersion(parent.getId(), 1); + util.changeFlowVersion(parent.getId(), "1"); util.assertFlowStaleAndUnmodified(parent.getId()); util.assertFlowStaleAndUnmodified(child.getId()); @@ -192,8 +191,8 @@ public void testChangeConnectionDestinationRemoveOldAndMoveGroup() throws NiFiCl // Save the flow as v2 util.saveFlowVersion(parent, clientEntity, v1Vci); - util.changeFlowVersion(parent.getId(), 1); - util.changeFlowVersion(parent.getId(), 2); + util.changeFlowVersion(parent.getId(), "1"); + util.changeFlowVersion(parent.getId(), "2"); } @@ -225,7 +224,7 @@ public void testControllerServiceUpdateWhileRunning() throws NiFiClientException util.saveFlowVersion(group, clientEntity, vci); // Change back to v1 and start the flow - util.changeFlowVersion(group.getId(), 1); + util.changeFlowVersion(group.getId(), "1"); util.assertFlowStaleAndUnmodified(group.getId()); util.enableControllerService(service); @@ -240,14 +239,14 @@ public void testControllerServiceUpdateWhileRunning() throws NiFiClientException assertEquals("1", firstFlowFileAttributes.get("count")); // Change to v2 and ensure that the output is correct - util.changeFlowVersion(group.getId(), 2); + util.changeFlowVersion(group.getId(), "2"); util.assertFlowUpToDate(group.getId()); waitForQueueCount(connectionToTerminate.getId(), 2 * getNumberOfNodes()); final Map secondFlowFileAttributes = util.getQueueFlowFile(connectionToTerminate.getId(), getNumberOfNodes()).getFlowFile().getAttributes(); assertEquals("2001", secondFlowFileAttributes.get("count")); // Change back to v1 and ensure that the output is correct. It should reset count back to 0. - util.changeFlowVersion(group.getId(), 1); + util.changeFlowVersion(group.getId(), "1"); util.assertFlowStaleAndUnmodified(group.getId()); waitForQueueCount(connectionToTerminate.getId(), 3 * getNumberOfNodes()); final Map thirdFlowFileAttributes = util.getQueueFlowFile(connectionToTerminate.getId(), getNumberOfNodes() * 2).getFlowFile().getAttributes(); @@ -259,7 +258,7 @@ public void testControllerServiceUpdateWhileRunning() throws NiFiClientException public void testChangeVersionWithPortMoveBetweenGroups() throws NiFiClientException, IOException, InterruptedException { final FlowRegistryClientEntity clientEntity = registerClient(new File("src/test/resources/versioned-flows")); - final ProcessGroupEntity imported = getClientUtil().importFlowFromRegistry("root", clientEntity.getId(), TEST_FLOWS_BUCKET, "port-moved-groups", 1); + final ProcessGroupEntity imported = getClientUtil().importFlowFromRegistry("root", clientEntity.getId(), TEST_FLOWS_BUCKET, "port-moved-groups", "1"); assertNotNull(imported); getClientUtil().assertFlowStaleAndUnmodified(imported.getId()); @@ -267,13 +266,13 @@ public void testChangeVersionWithPortMoveBetweenGroups() throws NiFiClientExcept final FlowSnippetDTO groupContents = imported.getComponent().getContents(); final List replaceTextProcessors = groupContents.getProcessors().stream() .filter(proc -> proc.getName().equals("ReplaceText")) - .collect(Collectors.toList()); + .toList(); assertEquals(1, replaceTextProcessors.size()); assertTrue(groupContents.getInputPorts().isEmpty()); // Change to version 2 - final VersionedFlowUpdateRequestEntity version2Result = getClientUtil().changeFlowVersion(imported.getId(), 2); + final VersionedFlowUpdateRequestEntity version2Result = getClientUtil().changeFlowVersion(imported.getId(), "2"); assertNull(version2Result.getRequest().getFailureReason()); final FlowDTO v2Contents = getNifiClient().getFlowClient().getProcessGroup(imported.getId()).getProcessGroupFlow().getFlow(); @@ -291,7 +290,7 @@ public void testChangeVersionWithPortMoveBetweenGroups() throws NiFiClientExcept assertEquals(1, v2Contents.getInputPorts().size()); // Change back to Version 1 - final VersionedFlowUpdateRequestEntity changeBackToV1Result = getClientUtil().changeFlowVersion(imported.getId(), 1); + final VersionedFlowUpdateRequestEntity changeBackToV1Result = getClientUtil().changeFlowVersion(imported.getId(), "1"); assertNull(changeBackToV1Result.getRequest().getFailureReason()); final FlowDTO v1Contents = getNifiClient().getFlowClient().getProcessGroup(imported.getId()).getProcessGroupFlow().getFlow(); @@ -306,11 +305,11 @@ public void testChangeVersionWithPortMoveBetweenGroups() throws NiFiClientExcept public void testRollbackOnFailure() throws NiFiClientException, IOException, InterruptedException { final FlowRegistryClientEntity clientEntity = registerClient(new File("src/test/resources/versioned-flows")); - final ProcessGroupEntity imported = getClientUtil().importFlowFromRegistry("root", clientEntity.getId(), TEST_FLOWS_BUCKET, "flow-with-invalid-connection", 1); + final ProcessGroupEntity imported = getClientUtil().importFlowFromRegistry("root", clientEntity.getId(), TEST_FLOWS_BUCKET, "flow-with-invalid-connection", "1"); assertNotNull(imported); getClientUtil().assertFlowStaleAndUnmodified(imported.getId()); - final VersionedFlowUpdateRequestEntity version2Result = getClientUtil().changeFlowVersion(imported.getId(), 2, false); + final VersionedFlowUpdateRequestEntity version2Result = getClientUtil().changeFlowVersion(imported.getId(), "2", false); final String failureReason = version2Result.getRequest().getFailureReason(); assertNotNull(failureReason); diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/stateless/StatelessBasicsIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/stateless/StatelessBasicsIT.java index 7ac63133bd28b..5ea5934cf6833 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/stateless/StatelessBasicsIT.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/stateless/StatelessBasicsIT.java @@ -761,14 +761,14 @@ public void testChangeFlowVersion() throws NiFiClientException, IOException, Int getClientUtil().stopProcessor(generate); // Switch back to v1 while flow is running - getClientUtil().changeFlowVersion(statelessGroup.getId(), 1); + getClientUtil().changeFlowVersion(statelessGroup.getId(), "1"); getClientUtil().startProcessor(generate); waitForQueueCount(outputToTerminate, 2); assertEquals(HELLO_WORLD, getClientUtil().getFlowFileContentAsUtf8(outputToTerminate.getId(), 1)); getClientUtil().stopProcessor(generate); // Switch back to v2 while flow is running - getClientUtil().changeFlowVersion(statelessGroup.getId(), 2); + getClientUtil().changeFlowVersion(statelessGroup.getId(), "2"); getClientUtil().startProcessor(generate); waitForQueueCount(outputToTerminate, 3); assertEquals(HELLO_WORLD_REVERSED, getClientUtil().getFlowFileContentAsUtf8(outputToTerminate.getId(), 2)); diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/pg/PGChangeVersion.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/pg/PGChangeVersion.java index cf332b00c62e1..7d00d98d80357 100644 --- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/pg/PGChangeVersion.java +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/pg/PGChangeVersion.java @@ -17,6 +17,7 @@ package org.apache.nifi.toolkit.cli.impl.command.nifi.pg; import org.apache.commons.cli.MissingOptionException; +import org.apache.nifi.registry.flow.RegisteredFlowSnapshotMetadata; import org.apache.nifi.toolkit.cli.api.CommandException; import org.apache.nifi.toolkit.cli.api.Context; import org.apache.nifi.toolkit.cli.impl.client.nifi.FlowClient; @@ -33,6 +34,7 @@ import org.apache.nifi.web.api.entity.VersionedFlowUpdateRequestEntity; import java.io.IOException; +import java.util.Objects; import java.util.Properties; /** @@ -71,13 +73,13 @@ public VoidResult doExecute(final NiFiClient client, final Properties properties } // start with the version specified in the arguments - Integer newVersion = getIntArg(properties, CommandOption.FLOW_VERSION); + String newVersion = getArg(properties, CommandOption.FLOW_VERSION); // if no version was specified, automatically determine the latest and change to that if (newVersion == null) { newVersion = getLatestVersion(client, existingVersionControlDTO); - if (newVersion.intValue() == existingVersionControlDTO.getVersion().intValue()) { + if (Objects.equals(newVersion, existingVersionControlDTO.getVersion())) { throw new NiFiClientException("Process group already at latest version"); } } @@ -121,7 +123,7 @@ public VoidResult doExecute(final NiFiClient client, final Properties properties return VoidResult.getInstance(); } - private int getLatestVersion(final NiFiClient client, final VersionControlInformationDTO existingVersionControlDTO) + private String getLatestVersion(final NiFiClient client, final VersionControlInformationDTO existingVersionControlDTO) throws NiFiClientException, IOException { final FlowClient flowClient = client.getFlowClient(); @@ -133,11 +135,17 @@ private int getLatestVersion(final NiFiClient client, final VersionControlInform if (versions.getVersionedFlowSnapshotMetadataSet() == null || versions.getVersionedFlowSnapshotMetadataSet().isEmpty()) { throw new NiFiClientException("No versions available"); } + return getLatestVersion(versions); + } - int latestVersion = 1; + private static String getLatestVersion(final VersionedFlowSnapshotMetadataSetEntity versions) { + long latestTimestamp = 0; + String latestVersion = null; for (VersionedFlowSnapshotMetadataEntity version : versions.getVersionedFlowSnapshotMetadataSet()) { - if (version.getVersionedFlowSnapshotMetadata().getVersion() > latestVersion) { - latestVersion = version.getVersionedFlowSnapshotMetadata().getVersion(); + final RegisteredFlowSnapshotMetadata versionMetadata = version.getVersionedFlowSnapshotMetadata(); + if (versionMetadata.getTimestamp() > latestTimestamp) { + latestTimestamp = versionMetadata.getTimestamp(); + latestVersion = versionMetadata.getVersion(); } } return latestVersion; diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/pg/PGImport.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/pg/PGImport.java index 8afed8b42b98d..1d133b6df3dba 100644 --- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/pg/PGImport.java +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/pg/PGImport.java @@ -74,7 +74,7 @@ public StringResult doExecute(final NiFiClient client, final Properties properti final String bucketId = getRequiredArg(properties, CommandOption.BUCKET_ID); final String flowId = getRequiredArg(properties, CommandOption.FLOW_ID); - final Integer flowVersion = getRequiredIntArg(properties, CommandOption.FLOW_VERSION); + final String flowVersion = getRequiredArg(properties, CommandOption.FLOW_VERSION); final String posXStr = getArg(properties, CommandOption.POS_X); final String posYStr = getArg(properties, CommandOption.POS_Y); diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/result/registry/VersionedFlowSnapshotMetadataSetResult.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/result/registry/VersionedFlowSnapshotMetadataSetResult.java index 717805f6f6e5a..2e60f01e2e059 100644 --- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/result/registry/VersionedFlowSnapshotMetadataSetResult.java +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/result/registry/VersionedFlowSnapshotMetadataSetResult.java @@ -17,19 +17,21 @@ package org.apache.nifi.toolkit.cli.impl.result.registry; import org.apache.nifi.registry.flow.RegisteredFlowSnapshotMetadata; -import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata; import org.apache.nifi.toolkit.cli.api.ResultType; -import org.apache.nifi.toolkit.cli.api.WritableResult; import org.apache.nifi.toolkit.cli.impl.result.AbstractWritableResult; +import org.apache.nifi.toolkit.cli.impl.result.writer.DynamicTableWriter; +import org.apache.nifi.toolkit.cli.impl.result.writer.Table; +import org.apache.nifi.toolkit.cli.impl.result.writer.TableWriter; import org.apache.nifi.web.api.entity.VersionedFlowSnapshotMetadataEntity; import org.apache.nifi.web.api.entity.VersionedFlowSnapshotMetadataSetEntity; import java.io.IOException; import java.io.PrintStream; +import java.util.Comparator; +import java.util.Date; import java.util.List; import java.util.Objects; import java.util.Set; -import java.util.stream.Collectors; /** * Result for VersionedFlowSnapshotMetadataSetEntity. @@ -56,24 +58,34 @@ protected void writeSimpleResult(final PrintStream output) throws IOException { return; } - // this will be sorted by the child result below - final List snapshots = entities.stream() - .map(v -> v.getVersionedFlowSnapshotMetadata()) - .map(this::convert) - .collect(Collectors.toList()); + // sort by timestamp + final List snapshots = entities.stream() + .map(VersionedFlowSnapshotMetadataEntity::getVersionedFlowSnapshotMetadata) + .sorted(Comparator.comparing(RegisteredFlowSnapshotMetadata::getTimestamp)) + .toList(); - final WritableResult> result = new RegisteredFlowSnapshotMetadataResult(resultType, snapshots); - result.write(output); - } + // date length, with locale specifics + final String datePattern = "%1$ta, % { + table.addRow( + vfs.getVersion(), + String.format(datePattern, new Date(vfs.getTimestamp())), + vfs.getAuthor() == null ? "" : vfs.getAuthor(), + vfs.getComments() == null ? "" : vfs.getComments() + ); + }); + + final TableWriter tableWriter = new DynamicTableWriter(); + tableWriter.write(table, output); } + }