From 030dbf50be73091ed421d0f05ad591cc48a8fdb5 Mon Sep 17 00:00:00 2001 From: Stefan Armbruster Date: Tue, 3 Apr 2018 16:17:45 +0200 Subject: [PATCH] fix #329: explicitly remove deleted nodes from manual indexes --- .../IndexUpdateTransactionEventHandler.java | 104 +++++++++++++----- ...ndexUpdateTransactionEventHandlerTest.java | 38 ++++++- 2 files changed, 109 insertions(+), 33 deletions(-) diff --git a/src/main/java/apoc/index/IndexUpdateTransactionEventHandler.java b/src/main/java/apoc/index/IndexUpdateTransactionEventHandler.java index 47675de484..cbb2cf44f7 100644 --- a/src/main/java/apoc/index/IndexUpdateTransactionEventHandler.java +++ b/src/main/java/apoc/index/IndexUpdateTransactionEventHandler.java @@ -14,7 +14,6 @@ import org.neo4j.helpers.collection.Iterables; import org.neo4j.kernel.AvailabilityGuard; import org.neo4j.kernel.internal.GraphDatabaseAPI; -import org.neo4j.kernel.lifecycle.LifeSupport; import org.neo4j.logging.Log; import java.util.*; @@ -27,7 +26,6 @@ import java.util.stream.Stream; import static org.neo4j.helpers.collection.Iterables.stream; -import static org.neo4j.helpers.collection.Iterators.*; /** * a transaction event handler that updates manual indexes based on configuration in graph properties @@ -40,18 +38,22 @@ public class IndexUpdateTransactionEventHandler extends TransactionEventHandler. private final GraphDatabaseService graphDatabaseService; private final boolean async; - private final BlockingQueue> indexCommandQueue ; + private final BlockingQueue> indexCommandQueue; private final boolean stopWatchEnabled; private final Log log; private Map>>> indexesByLabelAndProperty; private ScheduledFuture configUpdateFuture = null; + // "magic" command for queue to perform a tx rollover + private static boolean forceTxRolloverFlag = false; + private static final Consumer FORCE_TX_ROLLOVER = aVoid -> forceTxRolloverFlag = true; + public IndexUpdateTransactionEventHandler(GraphDatabaseAPI graphDatabaseService, Log log, boolean async, int queueCapacity, boolean stopWatchEnabled) { this.graphDatabaseService = graphDatabaseService; this.log = log; this.async = async; - this.indexCommandQueue = new LinkedBlockingQueue<>(queueCapacity); this.stopWatchEnabled = stopWatchEnabled; + this.indexCommandQueue = async ? new LinkedBlockingQueue<>(queueCapacity) : null; } public BlockingQueue> getIndexCommandQueue() { @@ -63,7 +65,7 @@ interface IndexFunction { void apply (A a, B b, C c, D d, E e); } - private Object logDuration(String message, Supplier supplier) { + private T logDuration(String message, Supplier supplier) { if (stopWatchEnabled) { StopWatch sw = new StopWatch(); try { @@ -82,7 +84,7 @@ private Object logDuration(String message, Supplier supplier) { @Override public Collection> beforeCommit(TransactionData data) throws Exception { - return (Collection>) logDuration("beforeCommit", () -> { + return logDuration("beforeCommit", () -> { getIndexesByLabelAndProperty(); Collection> state = async ? new LinkedList<>() : null; @@ -96,14 +98,13 @@ public Collection> beforeCommit(TransactionData data) throws Exce })); // filter out removedNodeProperties from node deletions - iterateNodePropertyChange(stream(data.removedNodeProperties()).filter(nodePropertyEntry -> !contains(data.deletedNodes().iterator(), nodePropertyEntry.entity())), true, (index, node, key, value, oldValue) -> indexUpdate(state, aVoid -> { + iterateNodePropertyChange(stream(data.removedNodeProperties()).filter(nodePropertyEntry -> !data.isDeleted(nodePropertyEntry.entity())), true, (index, node, key, value, oldValue) -> indexUpdate(state, aVoid -> { index.remove(node, key); index.remove(node, FreeTextSearch.KEY); })); - // performance tweak: converted create/deleted nodes to a set, so we can apply `contains` on it fast + // performance tweak: converted created nodes to a set, so we can apply `contains` on it fast final Set createdNodes = Iterables.asSet(data.createdNodes()); - final Set deletedNodes = Iterables.asSet(data.deletedNodes()); iterateLabelChanges( stream(data.assignedLabels()).filter( labelEntry -> !createdNodes.contains( labelEntry.node() ) ), (index, node, key, value, ignore) -> indexUpdate(state, aVoid -> { @@ -112,17 +113,16 @@ public Collection> beforeCommit(TransactionData data) throws Exce })); iterateLabelChanges( - stream(data.removedLabels()).filter( labelEntry -> !deletedNodes.contains( labelEntry.node() ) ), + stream(data.removedLabels()).filter( labelEntry -> !data.isDeleted(labelEntry.node()) ), (index, node, key, value, ignore) -> indexUpdate(state, aVoid -> { index.remove(node, key); index.remove(node, FreeTextSearch.KEY); })); + iterateNodeDeletions(stream(data.removedLabels()).filter( labelEntry -> data.isDeleted(labelEntry.node())), + (nodeIndex, node, void1, void2, void3) -> indexUpdate(state, aVoid -> nodeIndex.remove(node))); return state; - }); - - } @Override @@ -183,14 +183,27 @@ private void iterateLabelChanges(Stream stream, IndexFunction stream, IndexFunction, Node, Void, Void, Void> function) { + stream.forEach(labelEntry -> { + final Map>> propertyIndicesMap = indexesByLabelAndProperty.get(labelEntry.label().name()); + if (propertyIndicesMap!=null) { + for (Collection> indices: propertyIndicesMap.values()) { + for (Index index: indices) { + function.apply(index, labelEntry.node(), null, null, null); + } + } + } + }); + } + /** * in async mode add the index action to a collection for consumption in {@link #afterCommit(TransactionData, Collection)}, in sync mode, run it directly */ private Void indexUpdate(Collection> state, Consumer indexAction) { - if (state==null) { // sync - indexAction.accept(null); - } else { // async + if (async) { state.add(indexAction); + } else { + indexAction.accept(null); } return null; } @@ -215,7 +228,6 @@ private synchronized Map>>> initIndex for (String indexName : indexManager.nodeIndexNames()) { final Index index = indexManager.forNodes(indexName); - Map indexConfig = indexManager.getConfiguration(index); if (Util.toBoolean(indexConfig.get("autoUpdate"))) { @@ -276,8 +288,18 @@ private void startIndexTrackingThread(GraphDatabaseAPI db, BlockingQueue indexCommand = indexCommandQueue.poll(millisRollover, TimeUnit.MILLISECONDS); + + if (availabilityGuard.isShutdown()) { + log.debug("shutdown in progress. Aborting index tracking thread."); + break; + } + long now = System.currentTimeMillis(); - if ((opsCount>0) && ((now - lastCommit > millisRollover) || (opsCount >= opsCountRollover))) { + if ( + FORCE_TX_ROLLOVER.equals(indexCommand) || + ((opsCount>0) && ((now - lastCommit > millisRollover) || (opsCount >= opsCountRollover))) + + ) { tx.success(); tx.close(); tx = db.beginTx(); @@ -286,16 +308,9 @@ private void startIndexTrackingThread(GraphDatabaseAPI db, BlockingQueue data() { + return Arrays.asList(new Object[][] { + { false }, + { true} + }); + } + + @Parameterized.Parameter(value = 0) + public boolean asyncIndexUpdates; + @Before public void setUp() throws Exception { - db = new TestGraphDatabaseFactory().newImpermanentDatabaseBuilder() + db = new TestGraphDatabaseFactory() + .newImpermanentDatabaseBuilder() .setConfig("apoc.autoIndex.enabled", "true") + .setConfig("apoc.autoIndex.async", Boolean.toString(asyncIndexUpdates)) + .setConfig("apoc.autoIndex.async_rollover_millis", "200") .newGraphDatabase(); TestUtil.registerProcedure(db, FreeTextSearch.class); + TestUtil.registerProcedure(db, FulltextIndex.class); + + final ApocKernelExtensionFactory.ApocLifecycle apocLifecycle = ((GraphDatabaseAPI) db).getDependencyResolver().resolveDependency(ApocKernelExtensionFactory.ApocLifecycle.class); + indexUpdateTransactionEventHandler = apocLifecycle.getIndexUpdateLifeCycle().getIndexUpdateTransactionEventHandler(); } @After @@ -29,24 +56,26 @@ public void tearDown() { } @Test - @Ignore public void shouldDeletingIndexedNodesSucceed() { // setup: create index, add a node testCallEmpty(db, "call apoc.index.addAllNodesExtended('search_index',{City:['name']},{autoUpdate:true})", null); testCallEmpty(db, "create (c:City{name:\"Made Up City\",url:\"/places/nowhere/made-up-city\"})", null); + indexUpdateTransactionEventHandler.forceTxRollover(); // check if we find the node testCallCount(db, "start n=node:search_index('City.name:\"Made Up\"') return n", null, 1); // when TestUtil.testCall(db, "match (c:City{name:'Made Up City'}) delete c return count(c) as count", map -> assertEquals(1L, map.get("count"))); + indexUpdateTransactionEventHandler.forceTxRollover(); // nothing found in the index after deletion testCallCount(db, "start n=node:search_index('City.name:\"Made Up\"') return n", null, 0); } + @Test - public void shouldIndexFieldsBeUsedConsistently() { + public void shouldIndexFieldsBeUsedConsistently() throws InterruptedException { // setup: add a node, index it and add another node testCallEmpty(db, "create (c:City{name:\"Made Up City\",url:\"/places/nowhere/made-up-city\"})", null); @@ -55,6 +84,7 @@ public void shouldIndexFieldsBeUsedConsistently() { testCallCount(db, "create (c:City{name:\"Made Up City 2\",url:\"/places/nowhere/made-up-city\"}) return c", null, 1); + indexUpdateTransactionEventHandler.forceTxRollover(); // when & then testCallCount(db, "call apoc.index.search('search_index', 'City.name:Made') yield node, weight return node, weight", null, 2); testCallCount(db, "start n=node:search_index('name:\"Made Up\"') return n", null, 0);