Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix UUID bug #3828

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions full/src/main/java/apoc/uuid/UUIDHandlerNewProcedures.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,4 @@ public static ResourceIterator<Node> getUuidNodes(Transaction tx, String databas
public static ResourceIterator<Node> getUuidNodes(Transaction tx, String databaseName, Map<String, Object> props) {
return getSystemNodes(tx, databaseName, SystemLabels.ApocUuid, props);
}

public static void createConstraintUuid(Transaction tx, String label, String propertyName) {
tx.execute(
String.format("CREATE CONSTRAINT IF NOT EXISTS FOR (n:%s) REQUIRE (n.%s) IS UNIQUE",
Util.quote(label),
Util.quote(propertyName))
);
}
}
63 changes: 31 additions & 32 deletions full/src/main/java/apoc/uuid/UuidHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -65,13 +66,15 @@ public class UuidHandler extends LifecycleAdapter implements TransactionEventLis
private final Log log;
private final DatabaseManagementService databaseManagementService;
private final ApocConfig apocConfig;
private final ConcurrentHashMap<String, UuidConfig> configuredLabelAndPropertyNames = new ConcurrentHashMap<>();
// Snapshot of uuid configuration per label/property.
// The containing map is immutable!
private final AtomicReference<Map<String, UuidConfig>> labelAndPropertyNamesSnapshot = new AtomicReference<>(Map.of());
private final ApocConfig.UuidFormatType uuidFormat;
private final JobScheduler jobScheduler;
private final Pools pools;

private JobHandle refreshUuidHandle;
private long lastUpdate;
private volatile long lastUpdate;

public static final String APOC_UUID_REFRESH = "apoc.uuid.refresh";

Expand All @@ -97,7 +100,7 @@ public void start() {
Integer uuidRefresh = apocConfig.getConfig().getInteger(APOC_UUID_REFRESH, null);
if (uuidRefresh != null) {
refreshUuidHandle = jobScheduler.scheduleRecurring(Group.STORAGE_MAINTENANCE, () -> {
if (getLastUpdate(db.databaseName(), SystemLabels.ApocUuidMeta) > lastUpdate) {
if (getLastUpdate(db.databaseName(), SystemLabels.ApocUuidMeta) >= lastUpdate) {
refreshAndAdd();
}
},
Expand Down Expand Up @@ -150,7 +153,7 @@ public Void beforeCommit(TransactionData txData, Transaction transaction, GraphD
Iterable<PropertyEntry<Node>> assignedNodeProperties = txData.assignedNodeProperties();
Iterable<PropertyEntry<Node>> removedNodeProperties = txData.removedNodeProperties();

configuredLabelAndPropertyNames.forEach((label, config) -> {
labelAndPropertyNamesSnapshot.get().forEach((label, config) -> {
final String propertyName = config.getUuidProperty();
List<Node> nodes = config.isAddToSetLabels()
? StreamSupport.stream(txData.assignedLabels().spliterator(), false).map(LabelEntry::node).collect(Collectors.toList())
Expand Down Expand Up @@ -216,8 +219,6 @@ public void add(Transaction tx, String label, UuidConfig config) {
final String propertyName = config.getUuidProperty();
checkConstraintUuid(tx, label, propertyName);

configuredLabelAndPropertyNames.put(label, config);

try (Transaction sysTx = apocConfig.getSystemDb().beginTx()) {
Node node = Util.mergeNode(sysTx, SystemLabels.ApocUuid, null,
Pair.of(SystemPropertyKeys.database.name(), db.databaseName()),
Expand All @@ -227,20 +228,17 @@ public void add(Transaction tx, String label, UuidConfig config) {
node.setProperty(SystemPropertyKeys.addToSetLabel.name(), config.isAddToSetLabels());
sysTx.commit();
}
refresh();
}

public Map<String, UuidConfig> list() {
checkEnabled();
return configuredLabelAndPropertyNames;
return labelAndPropertyNamesSnapshot.get();
}


// we cannot refresh global configuredLabelAndPropertyNames before the forEach about `setExistingNodes`
// otherwise some tests like `UUIDNewProceduresTest.testUUIDSetUuidToEmptyAndRestore` could be flaky
// because we could populate the forEach after the beforeCommit
public synchronized void refreshAndAdd() {
configuredLabelAndPropertyNames.clear();
ConcurrentHashMap<String, UuidConfig> localCache = provisionalRefresh();
final var start = System.currentTimeMillis();
final var localCache = provisionalRefresh();

if (Util.isWriteableInstance(db)) {
// add to existing nodes
Expand All @@ -267,47 +265,48 @@ public synchronized void refreshAndAdd() {
});
}

configuredLabelAndPropertyNames.putAll(localCache);
labelAndPropertyNamesSnapshot.set(localCache);
lastUpdate = start;
}

public ConcurrentHashMap<String, UuidConfig> provisionalRefresh() {
ConcurrentHashMap<String, UuidConfig> localCache = new ConcurrentHashMap<>();

lastUpdate = System.currentTimeMillis();
public Map<String, UuidConfig> provisionalRefresh() {
try (Transaction tx = apocConfig.getSystemDb().beginTx()) {
tx.findNodes(SystemLabels.ApocUuid, SystemPropertyKeys.database.name(), db.databaseName())
.forEachRemaining(node -> {
final UuidConfig config = new UuidConfig(Map.of(
return tx.findNodes(SystemLabels.ApocUuid, SystemPropertyKeys.database.name(), db.databaseName())
.stream()
.collect(Collectors.toUnmodifiableMap(
node -> (String) node.getProperty(SystemPropertyKeys.label.name()),
node -> new UuidConfig(Map.of(
UUID_PROPERTY_KEY, node.getProperty(SystemPropertyKeys.propertyName.name()),
ADD_TO_SET_LABELS_KEY, node.getProperty(SystemPropertyKeys.addToSetLabel.name(), false),
ADD_TO_EXISTING_NODES_KEY, node.getProperty(SystemPropertyKeys.addToExistingNodes.name(), false)
));
localCache.put((String)node.getProperty(SystemPropertyKeys.label.name()), config);
});
tx.commit();
))
));
}
return localCache;
}

public void refresh() {
ConcurrentHashMap<String, UuidConfig> localCache = provisionalRefresh();
configuredLabelAndPropertyNames.clear();
configuredLabelAndPropertyNames.putAll(localCache);
final var start = System.currentTimeMillis();
// Note, there is a race condition here where lastUpdate can become larger than the last update
// if two threads are refreshing at the same time.
labelAndPropertyNamesSnapshot.set(provisionalRefresh());
lastUpdate = start;
}

public synchronized UuidConfig remove(String label) {
final var oldValue = labelAndPropertyNamesSnapshot.get().get(label);
try (Transaction tx = apocConfig.getSystemDb().beginTx()) {
tx.findNodes(SystemLabels.ApocUuid, SystemPropertyKeys.database.name(), db.databaseName(),
SystemPropertyKeys.label.name(), label)
.forEachRemaining(node -> node.delete());
tx.commit();
}
return configuredLabelAndPropertyNames.remove(label);
refresh();
return oldValue;
}

public synchronized Map<String, UuidConfig> removeAll() {
Map<String, UuidConfig> retval = new HashMap<>(configuredLabelAndPropertyNames);
configuredLabelAndPropertyNames.clear();
final var retval = labelAndPropertyNamesSnapshot.get();
labelAndPropertyNamesSnapshot.set(Map.of());
try (Transaction tx = apocConfig.getSystemDb().beginTx()) {
tx.findNodes(SystemLabels.ApocUuid, SystemPropertyKeys.database.name(), db.databaseName() )
.forEachRemaining(node -> node.delete());
Expand Down
1 change: 0 additions & 1 deletion test-utils/src/main/java/apoc/util/TestContainerUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ public static Neo4jContainerExtension createNeo4jContainer(List<ApocPackage> apo

Neo4jContainerExtension neo4jContainer = new Neo4jContainerExtension(dockerImage)
.withPlugins(MountableFile.forHostPath(pluginsFolder.toPath()))
.withTmpFs(Map.of("/logs", "rw", "/data", "rw", pluginsFolder.toPath().toAbsolutePath().toString(), "rw"))
.withAdminPassword(password)
.withEnv("NEO4J_ACCEPT_LICENSE_AGREEMENT", "yes")
.withEnv("apoc.export.file.enabled", "true")
Expand Down
Loading