Skip to content

Commit

Permalink
[gULAAkri] apoc.periodic.repeat doesn't always work (neo4j/apoc#388) (#…
Browse files Browse the repository at this point in the history
…3576)

* [gULAAkri] apoc.periodic.repeat doesn't always work

* [gULAAkri] added mockLog
  • Loading branch information
vga91 authored May 16, 2023
1 parent e852bd3 commit fbd0308
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 14 deletions.
3 changes: 2 additions & 1 deletion core/src/main/java/apoc/periodic/Periodic.java
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ public Stream<JobInfo> repeat(@Name("name") String name, @Name("statement") Stri
validateQuery(statement);
Map<String,Object> params = (Map)config.getOrDefault("params", Collections.emptyMap());
JobInfo info = schedule(name, () -> {
db.executeTransactionally(statement, params);
// `resultAsString` in order to consume result
db.executeTransactionally(statement, params, Result::resultAsString);
},0,rate);
return Stream.of(info);
}
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/java/apoc/periodic/PeriodicUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import apoc.util.Util;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.QueryStatistics;
import org.neo4j.graphdb.Result;
import org.neo4j.graphdb.Transaction;
import org.neo4j.internal.helpers.collection.Pair;
import org.neo4j.logging.Log;
Expand Down Expand Up @@ -142,7 +143,8 @@ public static Stream<JobInfo> submitProc(String name, String statement, Map<Stri
Map<String,Object> params = (Map) config.getOrDefault("params", Collections.emptyMap());
JobInfo info = submitJob(name, () -> {
try {
db.executeTransactionally(statement, params);
// `resultAsString` in order to consume result
db.executeTransactionally(statement, params, Result::resultAsString);
} catch(Exception e) {
log.warn("in background task via submit", e);
throw new RuntimeException(e);
Expand Down
81 changes: 69 additions & 12 deletions core/src/test/java/apoc/periodic/PeriodicTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@
import org.neo4j.kernel.api.KernelTransactionHandle;
import org.neo4j.kernel.impl.api.KernelTransactions;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.logging.AssertableLogProvider;
import org.neo4j.logging.Log;
import org.neo4j.procedure.Context;
import org.neo4j.procedure.Name;
import org.neo4j.procedure.Procedure;
import org.neo4j.test.rule.DbmsRule;
import org.neo4j.test.rule.ImpermanentDbmsRule;

Expand All @@ -45,6 +50,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.LongStream;
import java.util.stream.Stream;

Expand All @@ -62,21 +68,72 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.neo4j.driver.internal.util.Iterables.count;
import static org.neo4j.test.assertion.Assert.assertEventually;

public class PeriodicTest {
public static class MockLogger {
@Context public Log log;

@Procedure("apoc.mockLog")
public void mockLog(@Name("value") String value) {
log.info(value);
}
}

public static final long RUNDOWN_COUNT = 1000;
public static final int BATCH_SIZE = 399;

public static AssertableLogProvider logProvider = new AssertableLogProvider();

@Rule
public DbmsRule db = new ImpermanentDbmsRule();
public DbmsRule db = new ImpermanentDbmsRule(logProvider);

@Before
public void initDb() throws Exception {
TestUtil.registerProcedure(db, Periodic.class, Schemas.class, Cypher.class);
TestUtil.registerProcedure(db, Periodic.class, Schemas.class, Cypher.class, MockLogger.class);
db.executeTransactionally("call apoc.periodic.list() yield name call apoc.periodic.cancel(name) yield name as name2 return count(*)");
}

@Test
public void testRepeatWithVoidProcedure() {
String logVal = "repeatVoid";
String query = "CALL apoc.periodic.repeat('repeat-1', 'CALL apoc.mockLog($logVal)', 1, {params: {logVal: $logVal}})";
testLogIncrease(query, logVal);
}

@Test
public void testRepeatWithVoidProcedureAndReturn() {
String logVal = "repeatVoidWithReturn";
String query = "CALL apoc.periodic.repeat('repeat-2', 'CALL apoc.mockLog($logVal) RETURN 1', 1, {params: {logVal: $logVal}})";
testLogIncrease(query, logVal);
}

@Test
public void testSubmitWithVoidProcedure() {
String logVal = "submitVoid";
String query = "CALL apoc.periodic.submit('submit-1', 'CALL apoc.mockLog($logVal) RETURN 1', {params: {logVal: $logVal}})";
testLogIncrease(query, logVal);
}

@Test
public void testSubmitWithVoidProcedureAndReturn() {
String logVal = "submitVoidWithReturn";
String query = "CALL apoc.periodic.submit('submit-1', 'CALL apoc.mockLog($logVal)', {params: {logVal: $logVal}})";
testLogIncrease(query, logVal);
}

private void testLogIncrease(String query, String logVal) {
// execute a periodic procedure with `CALL apoc.mockLog(...)` as an inner procedure
db.executeTransactionally(query, Map.of("logVal", logVal));

// check custom log in logProvider
assertEventually(() -> {
String serialize = logProvider.serialize();
return serialize.contains(logVal);
}, (val) -> val, 5L, TimeUnit.SECONDS);

}

@Test
public void testSubmitStatement() throws Exception {
String callList = "CALL apoc.periodic.list()";
Expand Down Expand Up @@ -108,15 +165,15 @@ public void testSubmitWithCreateIndexSchemaOperation() {
@Test
public void testSubmitWithSchemaProcedure() {
String errMessage = "Supported inner procedure modes for the operation are [READ, WRITE, DEFAULT]";

// built-in neo4j procedure
final String createCons = "CALL db.createUniquePropertyConstraint('uniqueConsName', ['Alpha', 'Beta'], ['foo', 'bar'], 'lucene-1.0')";
testSchemaOperationCommon(createCons, errMessage);

// apoc procedures
testSchemaOperationCommon("CALL apoc.schema.assert({}, {})", errMessage);
testSchemaOperationCommon("CALL apoc.cypher.runSchema('CREATE CONSTRAINT periodicIdx FOR (n:Bar) REQUIRE n.first_name IS UNIQUE', {})", errMessage);

// inner schema procedure
final String innerSchema = "CALL { WITH 1 AS one CALL apoc.schema.assert({}, {}) YIELD key RETURN key } " +
"IN TRANSACTIONS OF 1000 rows RETURN 1";
Expand Down Expand Up @@ -160,15 +217,15 @@ public void testSubmitStatementWithParams() throws Exception {
@Test
public void testApplyPlanner() {
assertEquals("RETURN 1", applyPlanner("RETURN 1", Periodic.Planner.DEFAULT));
assertEquals("cypher planner=cost MATCH (n:cypher) RETURN n",
assertEquals("cypher planner=cost MATCH (n:cypher) RETURN n",
applyPlanner("MATCH (n:cypher) RETURN n", Periodic.Planner.COST));
assertEquals("cypher planner=idp MATCH (n:cypher) RETURN n",
assertEquals("cypher planner=idp MATCH (n:cypher) RETURN n",
applyPlanner("MATCH (n:cypher) RETURN n", Periodic.Planner.IDP));
assertEquals("cypher planner=dp runtime=compiled MATCH (n) RETURN n",
assertEquals("cypher planner=dp runtime=compiled MATCH (n) RETURN n",
applyPlanner("cypher runtime=compiled MATCH (n) RETURN n", Periodic.Planner.DP));
assertEquals("cypher planner=dp 3.1 MATCH (n) RETURN n",
assertEquals("cypher planner=dp 3.1 MATCH (n) RETURN n",
applyPlanner("cypher 3.1 MATCH (n) RETURN n", Periodic.Planner.DP));
assertEquals("cypher planner=idp expressionEngine=compiled MATCH (n) RETURN n",
assertEquals("cypher planner=idp expressionEngine=compiled MATCH (n) RETURN n",
applyPlanner("cypher expressionEngine=compiled MATCH (n) RETURN n", Periodic.Planner.IDP));
assertEquals("cypher expressionEngine=compiled planner=cost MATCH (n) RETURN n",
applyPlanner("cypher expressionEngine=compiled planner=idp MATCH (n) RETURN n", Periodic.Planner.COST));
Expand Down Expand Up @@ -341,13 +398,13 @@ public void testIterateWithQueryPlanner() throws Exception {

String cypherIterate = "match (p:Person) return p";
String cypherAction = "SET p.lastname =p.name REMOVE p.name";
testResult(db, "CALL apoc.periodic.iterate($cypherIterate, $cypherAction, $config)",
testResult(db, "CALL apoc.periodic.iterate($cypherIterate, $cypherAction, $config)",
map("cypherIterate", cypherIterate, "cypherAction", cypherAction,
"config", map("batchSize", 10, "planner", "DP")),
result -> assertEquals(10L, Iterators.single(result).get("batches")));

String cypherActionUnwind = "cypher runtime=slotted UNWIND $_batch AS batch WITH batch.p AS p SET p.lastname =p.name";

testResult(db, "CALL apoc.periodic.iterate($cypherIterate, $cypherActionUnwind, $config)",
map("cypherIterate", cypherIterate, "cypherActionUnwind", cypherActionUnwind,
"config", map("batchSize", 10, "batchMode", "BATCH_SINGLE", "planner", "DP")),
Expand Down

0 comments on commit fbd0308

Please sign in to comment.