Skip to content

Commit

Permalink
Checkpoint after switching int version to string version
Browse files Browse the repository at this point in the history
  • Loading branch information
bbende committed Apr 18, 2024
1 parent 6eff8a9 commit c0524c6
Show file tree
Hide file tree
Showing 37 changed files with 244 additions and 152 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
public class VersionedExternalFlowMetadata {
private String bucketId;
private String flowId;
private int version;
private String version;
private String flowName;
private String author;
private String comments;
Expand All @@ -42,11 +42,11 @@ public void setFlowIdentifier(final String flowId) {
this.flowId = flowId;
}

public int getVersion() {
public String getVersion() {
return version;
}

public void setVersion(final int version) {
public void setVersion(final String version) {
this.version = version;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class VersionedFlowCoordinates {
private String storageLocation;
private String bucketId;
private String flowId;
private int version;
private String version;
private Boolean latest;

@Schema(description = "The identifier of the Flow Registry that contains the flow")
Expand Down Expand Up @@ -66,11 +66,11 @@ public void setFlowId(String flowId) {
}

@Schema(description = "The version of the flow")
public int getVersion() {
public String getVersion() {
return version;
}

public void setVersion(int version) {
public void setVersion(String version) {
this.version = version;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.nifi.components.ConfigurableComponent;

import java.io.IOException;
import java.util.Optional;
import java.util.Set;

/**
Expand Down Expand Up @@ -165,20 +166,21 @@ public interface FlowRegistryClient extends ConfigurableComponent {
* @throws FlowRegistryException If an issue happens during processing the request.
* @throws IOException If there is issue with the communication between NiFi and the Flow Registry.
*/
RegisteredFlowSnapshot getFlowContents(FlowRegistryClientConfigurationContext context, String bucketId, String flowId, int version) throws FlowRegistryException, IOException;
RegisteredFlowSnapshot getFlowContents(FlowRegistryClientConfigurationContext context, String bucketId, String flowId, String version) throws FlowRegistryException, IOException;

/**
* Adds the given snapshot to the Flow Registry for the given Flow.
*
* @param context Configuration context.
* @param flowSnapshot The flow snapshot to register.
* @param action The register action
*
* @return The flow snapshot.
*
* @throws FlowRegistryException If an issue happens during processing the request.
* @throws IOException If there is issue with the communication between NiFi and the Flow Registry.
*/
RegisteredFlowSnapshot registerFlowSnapshot(FlowRegistryClientConfigurationContext context, RegisteredFlowSnapshot flowSnapshot) throws FlowRegistryException, IOException;
RegisteredFlowSnapshot registerFlowSnapshot(FlowRegistryClientConfigurationContext context, RegisteredFlowSnapshot flowSnapshot, RegisterAction action) throws FlowRegistryException, IOException;

/**
* Retrieves the set of all versions of the specified flow.
Expand Down Expand Up @@ -208,5 +210,5 @@ public interface FlowRegistryClient extends ConfigurableComponent {
* @throws FlowRegistryException If an issue happens during processing the request.
* @throws IOException If there is issue with the communication between NiFi and the Flow Registry.
*/
int getLatestVersion(FlowRegistryClientConfigurationContext context, String bucketId, String flowId) throws FlowRegistryException, IOException;
Optional<String> getLatestVersion(FlowRegistryClientConfigurationContext context, String bucketId, String flowId) throws FlowRegistryException, IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
*
* * Licensed to the Apache Software Foundation (ASF) under one or more
* * contributor license agreements. See the NOTICE file distributed with
* * this work for additional information regarding copyright ownership.
* * The ASF licenses this file to You under the Apache License, Version 2.0
* * (the "License"); you may not use this file except in compliance with
* * the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/

package org.apache.nifi.registry.flow;

public enum RegisterAction {
COMMIT,
FORCE_COMMIT;
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class RegisteredFlowSnapshot {
private Map<String, VersionedParameterContext> parameterContexts;
private String flowEncodingVersion;
private Map<String, ParameterProviderReference> parameterProviders;
private boolean latest;

public RegisteredFlowSnapshotMetadata getSnapshotMetadata() {
return snapshotMetadata;
Expand Down Expand Up @@ -62,7 +63,7 @@ public String getFlowEncodingVersion() {
}

public boolean isLatest() {
return flow != null && snapshotMetadata != null && flow.getVersionCount() == getSnapshotMetadata().getVersion();
return latest;
}

public void setSnapshotMetadata(final RegisteredFlowSnapshotMetadata snapshotMetadata) {
Expand Down Expand Up @@ -100,4 +101,8 @@ public Map<String, ParameterProviderReference> getParameterProviders() {
public void setParameterProviders(final Map<String, ParameterProviderReference> parameterProviders) {
this.parameterProviders = parameterProviders;
}

public void setLatest(final boolean latest) {
this.latest = latest;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
public class RegisteredFlowSnapshotMetadata {
private String bucketIdentifier;
private String flowIdentifier;
private int version;
private String version;
private long timestamp;
private String author;
private String comments;
Expand All @@ -32,7 +32,7 @@ public String getFlowIdentifier() {
return flowIdentifier;
}

public int getVersion() {
public String getVersion() {
return version;
}

Expand All @@ -56,7 +56,7 @@ public void setFlowIdentifier(String flowIdentifier) {
this.flowIdentifier = flowIdentifier;
}

public void setVersion(int version) {
public void setVersion(String version) {
this.version = version;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -229,45 +230,62 @@ public Set<RegisteredFlow> getFlows(final FlowRegistryClientConfigurationContext

@Override
public RegisteredFlowSnapshot getFlowContents(
final FlowRegistryClientConfigurationContext context, final String bucketId, final String flowId, final int version
final FlowRegistryClientConfigurationContext context, final String bucketId, final String flowId, final String version
) throws FlowRegistryException, IOException {
try {
final FlowSnapshotClient snapshotClient = getFlowSnapshotClient(context);
final VersionedFlowSnapshot snapshot = snapshotClient.get(bucketId, flowId, version);
final VersionedFlowSnapshot snapshot = snapshotClient.get(bucketId, flowId, Integer.parseInt(version));

if (snapshot == null) {
throw new NoSuchFlowVersionException(String.format("Version %d of flow %s does not exist in bucket %s", version, flowId, bucketId));
throw new NoSuchFlowVersionException(String.format("Version %s of flow %s does not exist in bucket %s", version, flowId, bucketId));
}

return NifiRegistryUtil.convert(snapshot);
final RegisteredFlowSnapshot registeredFlowSnapshot = NifiRegistryUtil.convert(snapshot);
registeredFlowSnapshot.setLatest(snapshot.getSnapshotMetadata().getVersion() == snapshot.getFlow().getVersionCount());
return registeredFlowSnapshot;
} catch (final NiFiRegistryException e) {
throw new FlowRegistryException(e.getMessage(), e);
}
}

@Override
public RegisteredFlowSnapshot registerFlowSnapshot(FlowRegistryClientConfigurationContext context, RegisteredFlowSnapshot flowSnapshot) throws FlowRegistryException, IOException {
public RegisteredFlowSnapshot registerFlowSnapshot(FlowRegistryClientConfigurationContext context, RegisteredFlowSnapshot flowSnapshot, RegisterAction action)
throws FlowRegistryException, IOException {
try {
final RegisteredFlowSnapshotMetadata snapshotMetadata = flowSnapshot.getSnapshotMetadata();
final int snapshotVersion = getRegisteredFlowSnapshotVersion(snapshotMetadata, action);
snapshotMetadata.setVersion(String.valueOf(snapshotVersion));

final FlowSnapshotClient snapshotClient = getFlowSnapshotClient(context);
final VersionedFlowSnapshot versionedFlowSnapshot = snapshotClient.create(NifiRegistryUtil.convert(flowSnapshot));
final VersionedFlowCoordinates versionedFlowCoordinates = new VersionedFlowCoordinates();

final String bucketId = versionedFlowSnapshot.getFlow().getBucketIdentifier();
final String flowId = versionedFlowSnapshot.getFlow().getIdentifier();
final int version = (int) versionedFlowSnapshot.getFlow().getVersionCount();

final VersionedFlowCoordinates versionedFlowCoordinates = new VersionedFlowCoordinates();
versionedFlowCoordinates.setRegistryId(getIdentifier());
versionedFlowCoordinates.setBucketId(bucketId);
versionedFlowCoordinates.setFlowId(flowId);
versionedFlowCoordinates.setVersion(version);
versionedFlowCoordinates.setVersion(String.valueOf(version));
versionedFlowCoordinates.setStorageLocation(getProposedUri(context) + "/nifi-registry-api/buckets/" + bucketId + "/flows/" + flowId + "/versions/" + version);
versionedFlowSnapshot.getFlowContents().setVersionedFlowCoordinates(versionedFlowCoordinates);
return NifiRegistryUtil.convert(versionedFlowSnapshot);

final RegisteredFlowSnapshot registeredFlowSnapshot = NifiRegistryUtil.convert(versionedFlowSnapshot);
registeredFlowSnapshot.setLatest(true);
return registeredFlowSnapshot;
} catch (NiFiRegistryException e) {
throw new FlowRegistryException(e.getMessage(), e);
}
}

private static int getRegisteredFlowSnapshotVersion(final RegisteredFlowSnapshotMetadata snapshotMetadata, final RegisterAction action) {
if (RegisterAction.FORCE_COMMIT == action) {
return -1;
}
return snapshotMetadata.getVersion() == null ? 1 : Integer.parseInt(snapshotMetadata.getVersion()) + 1;
}

@Override
public Set<RegisteredFlowSnapshotMetadata> getFlowVersions(
final FlowRegistryClientConfigurationContext context, final String bucketId, final String flowId
Expand All @@ -281,9 +299,10 @@ public Set<RegisteredFlowSnapshotMetadata> getFlowVersions(
}

@Override
public int getLatestVersion(final FlowRegistryClientConfigurationContext context, final String bucketId, final String flowId) throws FlowRegistryException, IOException {
public Optional<String> getLatestVersion(final FlowRegistryClientConfigurationContext context, final String bucketId, final String flowId) throws FlowRegistryException, IOException {
try {
return (int) getFlowClient(context).get(bucketId, flowId).getVersionCount();
final int versionCount = (int) getFlowClient(context).get(bucketId, flowId).getVersionCount();
return versionCount == 0 ? Optional.empty() : Optional.of(String.valueOf(versionCount));
} catch (NiFiRegistryException e) {
throw new FlowRegistryException(e.getMessage(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ static RegisteredFlowSnapshotMetadata convert(final VersionedFlowSnapshotMetadat
final RegisteredFlowSnapshotMetadata result = new RegisteredFlowSnapshotMetadata();
result.setBucketIdentifier(metadata.getBucketIdentifier());
result.setFlowIdentifier(metadata.getFlowIdentifier());
result.setVersion(metadata.getVersion());
result.setVersion(String.valueOf(metadata.getVersion()));
result.setTimestamp(metadata.getTimestamp());
result.setAuthor(metadata.getAuthor());
result.setComments(metadata.getComments());
Expand All @@ -88,7 +88,7 @@ static VersionedFlowSnapshotMetadata convert(final RegisteredFlowSnapshotMetadat
final VersionedFlowSnapshotMetadata result = new VersionedFlowSnapshotMetadata();
result.setBucketIdentifier(metadata.getBucketIdentifier());
result.setFlowIdentifier(metadata.getFlowIdentifier());
result.setVersion(metadata.getVersion());
result.setVersion(Integer.parseInt(metadata.getVersion()));
result.setTimestamp(metadata.getTimestamp());
result.setAuthor(metadata.getAuthor());
result.setComments(metadata.getComments());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class VersionControlInformationDTO {
private String flowId;
private String flowName;
private String flowDescription;
private Integer version;
private String version;
private String storageLocation;
private String state;
private String stateExplanation;
Expand Down Expand Up @@ -116,11 +116,11 @@ public void setFlowDescription(String flowDescription) {
}

@Schema(description = "The version of the flow")
public Integer getVersion() {
public String getVersion() {
return version;
}

public void setVersion(final Integer version) {
public void setVersion(final String version) {
this.version = version;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ private void synchronize(final ProcessGroup group, final VersionedProcessGroup p
final String registryId = determineRegistryId(remoteCoordinates);
final String bucketId = remoteCoordinates.getBucketId();
final String flowId = remoteCoordinates.getFlowId();
final int version = remoteCoordinates.getVersion();
final String version = remoteCoordinates.getVersion();
final String storageLocation = remoteCoordinates.getStorageLocation();

final FlowRegistryClientNode flowRegistry = context.getFlowManager().getFlowRegistryClient(registryId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3505,7 +3505,7 @@ public String getRegistryName() {
}

private boolean isModified() {
if (versionControlInformation.getVersion() == 0) {
if (versionControlInformation.getVersion() == null) {
return true;
}

Expand Down Expand Up @@ -3741,7 +3741,7 @@ public void synchronizeWithFlowRegistry(final FlowManager flowManager) {
}

final VersionedProcessGroup snapshot = vci.getFlowSnapshot();
if (snapshot == null && vci.getVersion() > 0) {
if (snapshot == null && vci.getVersion() != null) {
// We have not yet obtained the snapshot from the Flow Registry, so we need to request the snapshot of our local version of the flow from the Flow Registry.
// This allows us to know whether or not the flow has been modified since it was last synced with the Flow Registry.
try {
Expand Down Expand Up @@ -3774,15 +3774,15 @@ public void synchronizeWithFlowRegistry(final FlowManager flowManager) {

try {
final RegisteredFlow versionedFlow = flowRegistry.getFlow(FlowRegistryClientContextFactory.getAnonymousContext(), vci.getBucketIdentifier(), vci.getFlowIdentifier());
final int latestVersion = (int) versionedFlow.getVersionCount();
final String latestVersion = flowRegistry.getLatestVersion(FlowRegistryClientContextFactory.getAnonymousContext(), vci.getBucketIdentifier(), vci.getFlowIdentifier()).orElse(null);
vci.setBucketName(versionedFlow.getBucketName());
vci.setFlowName(versionedFlow.getName());
vci.setFlowDescription(versionedFlow.getDescription());
vci.setRegistryName(flowRegistry.getName());

if (latestVersion == vci.getVersion()) {
if (Objects.equals(latestVersion, vci.getVersion())) {
versionControlFields.setStale(false);
if (latestVersion == 0) {
if (latestVersion == null) {
LOG.debug("{} does not have any version in the Registry", this);
versionControlFields.setLocallyModified(true);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,15 @@ public RegisteredFlowSnapshot registerFlowSnapshot(
final Map<String, VersionedParameterContext> parameterContexts,
final Map<String, ParameterProviderReference> parameterProviderReferences,
final String comments,
final int expectedVersion
final String expectedVersion,
final RegisterAction registerAction
) throws FlowRegistryException, IOException {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Snapshot for flow {} is checked for violations before commit", snapshot.getInstanceIdentifier());
}

if (analyzeProcessGroupToRegister(snapshot)) {
return node.registerFlowSnapshot(context, flow, snapshot, externalControllerServices, parameterContexts, parameterProviderReferences, comments, expectedVersion);
return node.registerFlowSnapshot(context, flow, snapshot, externalControllerServices, parameterContexts, parameterProviderReferences, comments, expectedVersion, registerAction);
} else {
throw new FlowRegistryPreCommitException("There are unresolved rule violations");
}
Expand Down Expand Up @@ -431,7 +432,7 @@ public Set<RegisteredFlow> getFlows(final FlowRegistryClientUserContext context,

@Override
public FlowSnapshotContainer getFlowContents(
final FlowRegistryClientUserContext context, final String bucketId, final String flowId, final int version, final boolean fetchRemoteFlows
final FlowRegistryClientUserContext context, final String bucketId, final String flowId, final String version, final boolean fetchRemoteFlows
) throws FlowRegistryException, IOException {
return node.getFlowContents(context, bucketId, flowId, version, fetchRemoteFlows);
}
Expand All @@ -442,7 +443,7 @@ public Set<RegisteredFlowSnapshotMetadata> getFlowVersions(final FlowRegistryCli
}

@Override
public int getLatestVersion(final FlowRegistryClientUserContext context, final String bucketId, final String flowId) throws FlowRegistryException, IOException {
public Optional<String> getLatestVersion(final FlowRegistryClientUserContext context, final String bucketId, final String flowId) throws FlowRegistryException, IOException {
return node.getLatestVersion(context, bucketId, flowId);
}

Expand Down
Loading

0 comments on commit c0524c6

Please sign in to comment.