Skip to content

Commit

Permalink
[olbYUBeG] Added meta-data to batches in periodic iterate (#610)
Browse files Browse the repository at this point in the history
  • Loading branch information
JoelBergstrand authored Apr 4, 2024
1 parent f7538ac commit 79187f3
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 0 deletions.
11 changes: 11 additions & 0 deletions core/src/main/java/apoc/periodic/Periodic.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.neo4j.graphdb.schema.ConstraintDefinition;
import org.neo4j.graphdb.schema.IndexDefinition;
import org.neo4j.graphdb.schema.Schema;
import org.neo4j.kernel.impl.coreapi.InternalTransaction;
import org.neo4j.logging.Log;
import org.neo4j.procedure.*;

Expand Down Expand Up @@ -307,6 +308,13 @@ public Stream<BatchAndTotalResult> iterate(
"retries", 0)); // todo sleep/delay or push to end of batch to try again or immediate ?
int failedParams = Util.toInteger(config.getOrDefault("failedParams", -1));

final Map<String, Object> metaData;
if(tx instanceof InternalTransaction iTx){
metaData = iTx.kernelTransaction().getMetaData();
} else {
metaData = Map.of();
}

BatchMode batchMode = BatchMode.fromConfig(config);
Map<String, Object> params = (Map<String, Object>) config.getOrDefault("params", Collections.emptyMap());

Expand All @@ -333,6 +341,9 @@ public Stream<BatchAndTotalResult> iterate(
retries,
result,
(tx, p) -> {
if(tx instanceof InternalTransaction iTx){
iTx.setMetaData(metaData);
}
final Result r = tx.execute(innerStatement, merge(params, p));
Iterators.count(r); // XXX: consume all results
return r.getQueryStatistics();
Expand Down
58 changes: 58 additions & 0 deletions it/src/test/java/apoc/it/core/PeriodicIterateTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package apoc.it.core;

import apoc.util.Neo4jContainerExtension;
import apoc.util.TestContainerUtil;
import apoc.util.TestUtil;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.neo4j.driver.Session;
import org.neo4j.driver.TransactionConfig;

import java.util.List;
import java.util.Map;

import static apoc.util.TestContainerUtil.createDB;
import static apoc.util.TestContainerUtil.dockerImageForNeo4j;
import static org.junit.Assert.*;
import static org.junit.Assert.fail;

public class PeriodicIterateTest {

// The Query Log is only accessible on Enterprise
@Test
public void check_metadata_in_batches() {
try {
Neo4jContainerExtension neo4jContainer = createDB(
TestContainerUtil.Neo4jVersion.ENTERPRISE, List.of(TestContainerUtil.ApocPackage.CORE), !TestUtil.isRunningInCI())
.withNeo4jConfig("dbms.transaction.timeout", "60s");

neo4jContainer.start();


Session session = neo4jContainer.getSession();
session.run(
"CALL apoc.periodic.iterate(\"MATCH (p:Person) RETURN p\"," +
"\"SET p.name='test'\"," +
"{batchSize:1, parallel:false})",
TransactionConfig.builder()
.withMetadata(Map.of("shouldAppear", "inBatches"))
.build()
).stream().count();
var queryLogs = neo4jContainer.queryLogs();
assertTrue(queryLogs.contains("SET p.name='test' - {_batch: [], _count: 0} - runtime=pipelined - {shouldAppear: 'inBatches'}"));
session.close();
neo4jContainer.close();
} catch (Exception ex) {
// if Testcontainers wasn't able to retrieve the docker image we ignore the test
if (TestContainerUtil.isDockerImageAvailable(ex)) {
ex.printStackTrace();
fail("Should not have thrown exception when trying to start Neo4j: " + ex);
} else if (!TestUtil.isRunningInCI()) {
fail("The docker image " + dockerImageForNeo4j(TestContainerUtil.Neo4jVersion.ENTERPRISE)
+ " could not be loaded. Check whether it's available locally / in the CI. Exception:"
+ ex);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static apoc.util.TestContainerUtil.Neo4jVersion;
import static apoc.util.TestContainerUtil.Neo4jVersion.ENTERPRISE;

import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
Expand Down Expand Up @@ -109,6 +110,7 @@ public void dumpLogs() {
System.err.println(execInContainer("cat", "logs/debug.log").toString());
System.err.println(execInContainer("cat", "logs/http.log").toString());
System.err.println(execInContainer("cat", "logs/security.log").toString());
System.err.println(execInContainer("cat", "logs/query.log").toString());
} else {
System.err.println("Failed to find logs");
}
Expand All @@ -117,6 +119,10 @@ public void dumpLogs() {
}
}

public String queryLogs() throws IOException, InterruptedException {
return execInContainer("cat", "logs/query.log").toString();
}

private void executeScript(String filePath) {
InputStream resource = Thread.currentThread().getContextClassLoader().getResourceAsStream(filePath);
if (resource == null) {
Expand Down

0 comments on commit 79187f3

Please sign in to comment.