Skip to content

Commit

Permalink
fix #329: explicitly remove deleted nodes from manual indexes
Browse files Browse the repository at this point in the history
  • Loading branch information
sarmbruster committed Apr 3, 2018
1 parent 433ad36 commit 030dbf5
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 33 deletions.
104 changes: 75 additions & 29 deletions src/main/java/apoc/index/IndexUpdateTransactionEventHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.neo4j.helpers.collection.Iterables;
import org.neo4j.kernel.AvailabilityGuard;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.kernel.lifecycle.LifeSupport;
import org.neo4j.logging.Log;

import java.util.*;
Expand All @@ -27,7 +26,6 @@
import java.util.stream.Stream;

import static org.neo4j.helpers.collection.Iterables.stream;
import static org.neo4j.helpers.collection.Iterators.*;

/**
* a transaction event handler that updates manual indexes based on configuration in graph properties
Expand All @@ -40,18 +38,22 @@ public class IndexUpdateTransactionEventHandler extends TransactionEventHandler.
private final GraphDatabaseService graphDatabaseService;
private final boolean async;

private final BlockingQueue<Consumer<Void>> indexCommandQueue ;
private final BlockingQueue<Consumer<Void>> indexCommandQueue;
private final boolean stopWatchEnabled;
private final Log log;
private Map<String, Map<String, Collection<Index<Node>>>> indexesByLabelAndProperty;
private ScheduledFuture<?> configUpdateFuture = null;

// "magic" command for queue to perform a tx rollover
private static boolean forceTxRolloverFlag = false;
private static final Consumer<Void> FORCE_TX_ROLLOVER = aVoid -> forceTxRolloverFlag = true;

public IndexUpdateTransactionEventHandler(GraphDatabaseAPI graphDatabaseService, Log log, boolean async, int queueCapacity, boolean stopWatchEnabled) {
this.graphDatabaseService = graphDatabaseService;
this.log = log;
this.async = async;
this.indexCommandQueue = new LinkedBlockingQueue<>(queueCapacity);
this.stopWatchEnabled = stopWatchEnabled;
this.indexCommandQueue = async ? new LinkedBlockingQueue<>(queueCapacity) : null;
}

public BlockingQueue<Consumer<Void>> getIndexCommandQueue() {
Expand All @@ -63,7 +65,7 @@ interface IndexFunction<A, B, C, D, E> {
void apply (A a, B b, C c, D d, E e);
}

private Object logDuration(String message, Supplier supplier) {
private <T> T logDuration(String message, Supplier<T> supplier) {
if (stopWatchEnabled) {
StopWatch sw = new StopWatch();
try {
Expand All @@ -82,7 +84,7 @@ private Object logDuration(String message, Supplier supplier) {
@Override
public Collection<Consumer<Void>> beforeCommit(TransactionData data) throws Exception {

return (Collection<Consumer<Void>>) logDuration("beforeCommit", () -> {
return logDuration("beforeCommit", () -> {
getIndexesByLabelAndProperty();
Collection<Consumer<Void>> state = async ? new LinkedList<>() : null;

Expand All @@ -96,14 +98,13 @@ public Collection<Consumer<Void>> beforeCommit(TransactionData data) throws Exce
}));

// filter out removedNodeProperties from node deletions
iterateNodePropertyChange(stream(data.removedNodeProperties()).filter(nodePropertyEntry -> !contains(data.deletedNodes().iterator(), nodePropertyEntry.entity())), true, (index, node, key, value, oldValue) -> indexUpdate(state, aVoid -> {
iterateNodePropertyChange(stream(data.removedNodeProperties()).filter(nodePropertyEntry -> !data.isDeleted(nodePropertyEntry.entity())), true, (index, node, key, value, oldValue) -> indexUpdate(state, aVoid -> {
index.remove(node, key);
index.remove(node, FreeTextSearch.KEY);
}));

// performance tweak: converted create/deleted nodes to a set, so we can apply `contains` on it fast
// performance tweak: converted created nodes to a set, so we can apply `contains` on it fast
final Set<Node> createdNodes = Iterables.asSet(data.createdNodes());
final Set<Node> deletedNodes = Iterables.asSet(data.deletedNodes());
iterateLabelChanges(
stream(data.assignedLabels()).filter( labelEntry -> !createdNodes.contains( labelEntry.node() ) ),
(index, node, key, value, ignore) -> indexUpdate(state, aVoid -> {
Expand All @@ -112,17 +113,16 @@ public Collection<Consumer<Void>> beforeCommit(TransactionData data) throws Exce
}));

iterateLabelChanges(
stream(data.removedLabels()).filter( labelEntry -> !deletedNodes.contains( labelEntry.node() ) ),
stream(data.removedLabels()).filter( labelEntry -> !data.isDeleted(labelEntry.node()) ),
(index, node, key, value, ignore) -> indexUpdate(state, aVoid -> {
index.remove(node, key);
index.remove(node, FreeTextSearch.KEY);
}));

iterateNodeDeletions(stream(data.removedLabels()).filter( labelEntry -> data.isDeleted(labelEntry.node())),
(nodeIndex, node, void1, void2, void3) -> indexUpdate(state, aVoid -> nodeIndex.remove(node)));
return state;

});


}

@Override
Expand Down Expand Up @@ -183,14 +183,27 @@ private void iterateLabelChanges(Stream<LabelEntry> stream, IndexFunction<Index<
});
}

private void iterateNodeDeletions(Stream<LabelEntry> stream, IndexFunction<Index<Node>, Node, Void, Void, Void> function) {
stream.forEach(labelEntry -> {
final Map<String, Collection<Index<Node>>> propertyIndicesMap = indexesByLabelAndProperty.get(labelEntry.label().name());
if (propertyIndicesMap!=null) {
for (Collection<Index<Node>> indices: propertyIndicesMap.values()) {
for (Index<Node> index: indices) {
function.apply(index, labelEntry.node(), null, null, null);
}
}
}
});
}

/**
* in async mode add the index action to a collection for consumption in {@link #afterCommit(TransactionData, Collection)}, in sync mode, run it directly
*/
private Void indexUpdate(Collection<Consumer<Void>> state, Consumer<Void> indexAction) {
if (state==null) { // sync
indexAction.accept(null);
} else { // async
if (async) {
state.add(indexAction);
} else {
indexAction.accept(null);
}
return null;
}
Expand All @@ -215,7 +228,6 @@ private synchronized Map<String, Map<String, Collection<Index<Node>>>> initIndex
for (String indexName : indexManager.nodeIndexNames()) {

final Index<Node> index = indexManager.forNodes(indexName);

Map<String, String> indexConfig = indexManager.getConfiguration(index);

if (Util.toBoolean(indexConfig.get("autoUpdate"))) {
Expand Down Expand Up @@ -276,8 +288,18 @@ private void startIndexTrackingThread(GraphDatabaseAPI db, BlockingQueue<Consume
long lastCommit = System.currentTimeMillis();
while (true) {
Consumer<Void> indexCommand = indexCommandQueue.poll(millisRollover, TimeUnit.MILLISECONDS);

if (availabilityGuard.isShutdown()) {
log.debug("shutdown in progress. Aborting index tracking thread.");
break;
}

long now = System.currentTimeMillis();
if ((opsCount>0) && ((now - lastCommit > millisRollover) || (opsCount >= opsCountRollover))) {
if (
FORCE_TX_ROLLOVER.equals(indexCommand) ||
((opsCount>0) && ((now - lastCommit > millisRollover) || (opsCount >= opsCountRollover)))

) {
tx.success();
tx.close();
tx = db.beginTx();
Expand All @@ -286,16 +308,9 @@ private void startIndexTrackingThread(GraphDatabaseAPI db, BlockingQueue<Consume
opsCount = 0;
}
if (indexCommand == null) {
boolean running = db.getDependencyResolver().resolveDependency(LifeSupport.class).isRunning();
if (running) {
// in case we couldn't get anything from queue, we'll update lastcommit to prevent too early commits
if (opsCount==0) {
lastCommit = now;
}
} else {
// check if a database shutdown is already in progress, if so, terminate this thread
log.info("system shutdown detected, terminating indexing background thread");
break;
// in case we couldn't get anything from queue, we'll update lastcommit to prevent too early commits
if (opsCount == 0) {
lastCommit = now;
}
} else {
opsCount++;
Expand All @@ -305,10 +320,18 @@ private void startIndexTrackingThread(GraphDatabaseAPI db, BlockingQueue<Consume
} catch (InterruptedException|AvailabilityGuard.UnavailableException e) {
log.error(e.getMessage(), e);
throw new RuntimeException(e);
} catch (Exception e) {
log.error(e.getMessage(), e);
throw new RuntimeException(e);
} finally {
if (tx!=null) {
tx.success();
tx.close();
try {
tx.close();
log.debug("final commit in background thread");
} catch (TransactionTerminatedException e) {
log.error(e.getMessage(), e);
}
}
log.info("stopping background thread for async index updates");
}
Expand All @@ -328,6 +351,11 @@ public void resetConfiguration() {
indexUpdateTransactionEventHandler.resetConfiguration();
}
}

public IndexUpdateTransactionEventHandler getIndexUpdateTransactionEventHandler() {
return indexUpdateTransactionEventHandler;
}

}

private void startPeriodicIndexConfigChangeUpdates(long indexConfigUpdateInternal) {
Expand All @@ -341,4 +369,22 @@ private void stopPeriodicIndexConfigChangeUpdates() {
}
}

/**
* to be used from unit tests to ensure a tx rollover has happenend
*/
public synchronized void forceTxRollover() {
if (async) {
try {
forceTxRolloverFlag = false;
indexCommandQueue.put(FORCE_TX_ROLLOVER);
while (!forceTxRolloverFlag) {
Thread.sleep(5);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

}

}
Original file line number Diff line number Diff line change
@@ -1,26 +1,53 @@
package apoc.index;

import apoc.ApocKernelExtensionFactory;
import apoc.util.TestUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.test.TestGraphDatabaseFactory;

import java.util.Arrays;
import java.util.Collection;

import static apoc.util.TestUtil.*;
import static org.junit.Assert.*;

@RunWith(Parameterized.class)
public class IndexUpdateTransactionEventHandlerTest {

private GraphDatabaseService db;
private IndexUpdateTransactionEventHandler indexUpdateTransactionEventHandler;

@Parameterized.Parameters(name = "async index updates {0}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
{ false },
{ true}
});
}

@Parameterized.Parameter(value = 0)
public boolean asyncIndexUpdates;


@Before
public void setUp() throws Exception {
db = new TestGraphDatabaseFactory().newImpermanentDatabaseBuilder()
db = new TestGraphDatabaseFactory()
.newImpermanentDatabaseBuilder()
.setConfig("apoc.autoIndex.enabled", "true")
.setConfig("apoc.autoIndex.async", Boolean.toString(asyncIndexUpdates))
.setConfig("apoc.autoIndex.async_rollover_millis", "200")
.newGraphDatabase();
TestUtil.registerProcedure(db, FreeTextSearch.class);
TestUtil.registerProcedure(db, FulltextIndex.class);

final ApocKernelExtensionFactory.ApocLifecycle apocLifecycle = ((GraphDatabaseAPI) db).getDependencyResolver().resolveDependency(ApocKernelExtensionFactory.ApocLifecycle.class);
indexUpdateTransactionEventHandler = apocLifecycle.getIndexUpdateLifeCycle().getIndexUpdateTransactionEventHandler();
}

@After
Expand All @@ -29,24 +56,26 @@ public void tearDown() {
}

@Test
@Ignore
public void shouldDeletingIndexedNodesSucceed() {
// setup: create index, add a node
testCallEmpty(db, "call apoc.index.addAllNodesExtended('search_index',{City:['name']},{autoUpdate:true})", null);
testCallEmpty(db, "create (c:City{name:\"Made Up City\",url:\"/places/nowhere/made-up-city\"})", null);
indexUpdateTransactionEventHandler.forceTxRollover();

// check if we find the node
testCallCount(db, "start n=node:search_index('City.name:\"Made Up\"') return n", null, 1);

// when
TestUtil.testCall(db, "match (c:City{name:'Made Up City'}) delete c return count(c) as count", map -> assertEquals(1L, map.get("count")));
indexUpdateTransactionEventHandler.forceTxRollover();

// nothing found in the index after deletion
testCallCount(db, "start n=node:search_index('City.name:\"Made Up\"') return n", null, 0);
}


@Test
public void shouldIndexFieldsBeUsedConsistently() {
public void shouldIndexFieldsBeUsedConsistently() throws InterruptedException {
// setup: add a node, index it and add another node
testCallEmpty(db, "create (c:City{name:\"Made Up City\",url:\"/places/nowhere/made-up-city\"})", null);

Expand All @@ -55,6 +84,7 @@ public void shouldIndexFieldsBeUsedConsistently() {
testCallCount(db, "create (c:City{name:\"Made Up City 2\",url:\"/places/nowhere/made-up-city\"}) return c",
null, 1);

indexUpdateTransactionEventHandler.forceTxRollover();
// when & then
testCallCount(db, "call apoc.index.search('search_index', 'City.name:Made') yield node, weight return node, weight", null, 2);
testCallCount(db, "start n=node:search_index('name:\"Made Up\"') return n", null, 0);
Expand Down

0 comments on commit 030dbf5

Please sign in to comment.