Skip to content

Commit

Permalink
Checkpoint - implemented sorting, but REST layer is re-sorting
Browse files Browse the repository at this point in the history
  • Loading branch information
bbende committed Apr 25, 2024
1 parent 1970be3 commit 9a5673f
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.nifi.components.ConfigurableComponent;

import java.io.IOException;
import java.util.Comparator;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
Expand Down Expand Up @@ -221,4 +222,16 @@ default String generateFlowId(final String flowName) {
return UUID.randomUUID().toString();
}

default Comparator<FlowRegistryBucket> getBucketComparator() {
return Comparator.comparing(FlowRegistryBucket::getName);
}

default Comparator<RegisteredFlow> getFlowComparator() {
return Comparator.comparing(RegisteredFlow::getName);
}

default Comparator<RegisteredFlowSnapshotMetadata> getFlowVersionComparator() {
return Comparator.comparingLong(RegisteredFlowSnapshotMetadata::getTimestamp)
.thenComparing(RegisteredFlowSnapshotMetadata::getVersion);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.io.IOException;
import java.net.URL;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -456,4 +457,19 @@ public String generateFlowId(final String flowName) throws IOException, FlowRegi
public void setComponent(final LoggableComponent<FlowRegistryClient> component) {
node.setComponent(component);
}

@Override
public Comparator<FlowRegistryBucket> getBucketComparator() {
return node.getBucketComparator();
}

@Override
public Comparator<RegisteredFlow> getFlowComparator() {
return node.getFlowComparator();
}

@Override
public Comparator<RegisteredFlowSnapshotMetadata> getFlowVersionComparator() {
return node.getFlowVersionComparator();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,21 @@ public void setComponent(final LoggableComponent<FlowRegistryClient> component)
client.set(component);
}

@Override
public Comparator<FlowRegistryBucket> getBucketComparator() {
return client.get().getComponent().getBucketComparator();
}

@Override
public Comparator<RegisteredFlow> getFlowComparator() {
return client.get().getComponent().getFlowComparator();
}

@Override
public Comparator<RegisteredFlowSnapshotMetadata> getFlowVersionComparator() {
return client.get().getComponent().getFlowVersionComparator();
}

private <T> T execute(final FlowRegistryClientAction<T> action) throws FlowRegistryException, IOException {
final ValidationStatus validationStatus = getValidationStatus();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.nifi.flow.VersionedProcessGroup;

import java.io.IOException;
import java.util.Comparator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -62,4 +63,10 @@ RegisteredFlowSnapshot registerFlowSnapshot(
String generateFlowId(String flowName) throws IOException, FlowRegistryException;

void setComponent(LoggableComponent<FlowRegistryClient> component);

Comparator<FlowRegistryBucket> getBucketComparator();

Comparator<RegisteredFlow> getFlowComparator();

Comparator<RegisteredFlowSnapshotMetadata> getFlowVersionComparator();
}
Original file line number Diff line number Diff line change
Expand Up @@ -3222,7 +3222,7 @@ public Set<FlowRegistryBucketEntity> getBucketsForUser(final String registryClie

return entityFactory.createBucketEntity(dto, permissions);
})
.collect(Collectors.toSet());
.collect(Collectors.toCollection(LinkedHashSet::new));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public Set<FlowRegistryBucket> getBucketsForUser(final FlowRegistryClientUserCon
}

final Set<FlowRegistryBucket> buckets = flowRegistry.getBuckets(context);
final Set<FlowRegistryBucket> sortedBuckets = new TreeSet<>((b1, b2) -> b1.getName().compareTo(b2.getName()));
final Set<FlowRegistryBucket> sortedBuckets = new TreeSet<>(flowRegistry.getBucketComparator());
sortedBuckets.addAll(buckets);
return sortedBuckets;
} catch (final FlowRegistryException e) {
Expand All @@ -132,7 +132,7 @@ public Set<RegisteredFlow> getFlowsForUser(final FlowRegistryClientUserContext c
}

final Set<RegisteredFlow> flows = flowRegistry.getFlows(context, bucketId);
final Set<RegisteredFlow> sortedFlows = new TreeSet<>(Comparator.comparing(RegisteredFlow::getName).reversed());
final Set<RegisteredFlow> sortedFlows = new TreeSet<>(flowRegistry.getFlowComparator());
sortedFlows.addAll(flows);
return sortedFlows;
} catch (final IOException | FlowRegistryException ioe) {
Expand Down Expand Up @@ -163,9 +163,7 @@ public Set<RegisteredFlowSnapshotMetadata> getFlowVersionsForUser(final FlowRegi
}

final Set<RegisteredFlowSnapshotMetadata> flowVersions = flowRegistry.getFlowVersions(context, bucketId, flowId);
final Set<RegisteredFlowSnapshotMetadata> sortedFlowVersions = new TreeSet<>(
Comparator.comparingLong(RegisteredFlowSnapshotMetadata::getTimestamp)
.thenComparing(RegisteredFlowSnapshotMetadata::getVersion));
final Set<RegisteredFlowSnapshotMetadata> sortedFlowVersions = new TreeSet<>(flowRegistry.getFlowVersionComparator());
sortedFlowVersions.addAll(flowVersions);
return sortedFlowVersions;
} catch (final IOException | FlowRegistryException ioe) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -112,6 +113,8 @@ public class GitHubFlowRegistryClient extends AbstractFlowRegistryClient {
static final String DEFAULT_FLOW_SNAPSHOT_COMMIT_MESSAGE = "Saving Flow Snapshot";
static final String SNAPSHOT_FILE_EXTENSION = ".json";
static final String SNAPSHOT_FILE_FORMAT = "%s" + SNAPSHOT_FILE_EXTENSION;
static final String MAIN_BUCKET = "main";
static final String MASTER_BUCKET = "master";

private volatile GitHubRepositoryClient repositoryClient;
private final AtomicBoolean initialized = new AtomicBoolean(false);
Expand Down Expand Up @@ -361,6 +364,22 @@ public String generateFlowId(final String flowName) {
.replaceAll("(-)\\1+", "$1"); // replace consecutive - with single -
}

@Override
public Comparator<FlowRegistryBucket> getBucketComparator() {
final Comparator<FlowRegistryBucket> defaultComparator = super.getBucketComparator();
return (b1, b2) -> {
if ((MAIN_BUCKET.equals(b1.getName()) && !MAIN_BUCKET.equals(b2.getName()))
|| (MASTER_BUCKET.equals(b1.getName()) && !MASTER_BUCKET.equals(b2.getName()))) {
return -1;
}
if ((MAIN_BUCKET.equals(b2.getName()) && !MAIN_BUCKET.equals(b1.getName()))
|| (MASTER_BUCKET.equals(b2.getName()) && !MASTER_BUCKET.equals(b1.getName()))) {
return 1;
}
return defaultComparator.compare(b1, b2);
};
}

private FlowRegistryBucket createFlowRegistryBucket(final String name) {
final FlowRegistryPermissions bucketPermissions = new FlowRegistryPermissions();
bucketPermissions.setCanRead(true);
Expand Down

0 comments on commit 9a5673f

Please sign in to comment.