From 2070ee2541a63298b1a8a7f0338c8caae0a15083 Mon Sep 17 00:00:00 2001 From: Ali Ince Date: Tue, 27 Aug 2024 11:41:42 +0100 Subject: [PATCH] fix: use new procedure names (#48) --- .../java/org/neo4j/cdc/client/CDCClient.java | 16 +-- .../org/neo4j/cdc/client/CDCClient514IT.java | 69 ----------- .../org/neo4j/cdc/client/CDCClientIT.java | 117 ++++++++++-------- .../neo4j/cdc/client/CDCClientLatestIT.java | 64 ---------- .../CDCClientWithAggressiveLogPruningIT.java | 2 +- 5 files changed, 76 insertions(+), 192 deletions(-) delete mode 100644 src/test/java/org/neo4j/cdc/client/CDCClient514IT.java delete mode 100644 src/test/java/org/neo4j/cdc/client/CDCClientLatestIT.java diff --git a/src/main/java/org/neo4j/cdc/client/CDCClient.java b/src/main/java/org/neo4j/cdc/client/CDCClient.java index 4311381..428f621 100644 --- a/src/main/java/org/neo4j/cdc/client/CDCClient.java +++ b/src/main/java/org/neo4j/cdc/client/CDCClient.java @@ -42,9 +42,9 @@ public class CDCClient implements CDCService { private final Logger log = LoggerFactory.getLogger(CDCClient.class); - private static final String CDC_EARLIEST_STATEMENT = "call cdc.earliest()"; - private static final String CDC_CURRENT_STATEMENT = "call cdc.current()"; - private static final String CDC_QUERY_STATEMENT = "call cdc.query($from, $selectors)"; + private static final String CDC_EARLIEST_STATEMENT = "call db.cdc.earliest()"; + private static final String CDC_CURRENT_STATEMENT = "call db.cdc.current()"; + private static final String CDC_QUERY_STATEMENT = "call db.cdc.query($from, $selectors)"; private final Driver driver; private final List selectors; private final SessionConfigSupplier sessionConfigSupplier; @@ -114,12 +114,12 @@ public CDCClient( @Override public Mono earliest() { - return queryForChangeIdentifier(CDC_EARLIEST_STATEMENT, "cdc.earliest"); + return queryForChangeIdentifier(CDC_EARLIEST_STATEMENT, "db.cdc.earliest"); } @Override public Mono current() { - return queryForChangeIdentifier(CDC_CURRENT_STATEMENT, "cdc.current"); + return queryForChangeIdentifier(CDC_CURRENT_STATEMENT, "db.cdc.current"); } @Override @@ -133,7 +133,7 @@ public Flux query(ChangeIdentifier from) { "selectors", selectors.stream().map(Selector::asMap).collect(Collectors.toList())); - log.trace("running cdc.query using parameters {}", params); + log.trace("running db.cdc.query using parameters {}", params); RxResult result = tx.run(CDC_QUERY_STATEMENT, params); return Flux.from(result.records()) @@ -152,7 +152,7 @@ public Flux stream(ChangeIdentifier from) { var query = Flux.usingWhen( Mono.fromSupplier(() -> driver.rxSession(sessionConfigSupplier.sessionConfig())), (RxSession session) -> Flux.from(session.readTransaction(tx -> { - var current = Mono.from(tx.run("CALL cdc.current()").records()) + var current = Mono.from(tx.run("CALL db.cdc.current()").records()) .map(MapAccessor::asMap) .map(ResultMapper::parseChangeIdentifier); @@ -162,7 +162,7 @@ public Flux stream(ChangeIdentifier from) { "selectors", selectors.stream().map(Selector::asMap).collect(Collectors.toList())); - log.trace("running cdc.query using parameters {}", params); + log.trace("running db.cdc.query using parameters {}", params); RxResult result = tx.run(CDC_QUERY_STATEMENT, params); return current.flatMapMany(changeId -> Flux.from(result.records()) diff --git a/src/test/java/org/neo4j/cdc/client/CDCClient514IT.java b/src/test/java/org/neo4j/cdc/client/CDCClient514IT.java deleted file mode 100644 index 709a222..0000000 --- a/src/test/java/org/neo4j/cdc/client/CDCClient514IT.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Copyright (c) "Neo4j" - * Neo4j Sweden AB [https://neo4j.com] - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.neo4j.cdc.client; - -import java.util.Collections; -import java.util.Map; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; -import org.neo4j.driver.AuthTokens; -import org.neo4j.driver.Driver; -import org.neo4j.driver.GraphDatabase; -import org.testcontainers.containers.Neo4jContainer; -import org.testcontainers.junit.jupiter.Container; - -/** - * Neo4j 5.15+ introduced a breaking change in node ande relationship keys structure. This suite verifies if - * CDC Client is backward compatible with 5.14 and earlier. - */ -public class CDCClient514IT extends CDCClientIT { - - private static final String NEO4J_VERSION = "5.14"; - - @SuppressWarnings("resource") - @Container - private static final Neo4jContainer neo4j = new Neo4jContainer<>("neo4j:" + NEO4J_VERSION + "-enterprise") - .withEnv("NEO4J_ACCEPT_LICENSE_AGREEMENT", "yes") - .withAdminPassword("passw0rd"); - - private static Driver driver; - - @BeforeAll - static void setup() { - driver = GraphDatabase.driver(neo4j.getBoltUrl(), AuthTokens.basic("neo4j", "passw0rd")); - } - - @AfterAll - static void cleanup() { - driver.close(); - } - - @Override - Driver driver() { - return driver; - } - - @Override - Neo4jContainer neo4j() { - return neo4j; - } - - @Override - Map defaultExpectedAdditionalEntries() { - return Collections.emptyMap(); - } -} diff --git a/src/test/java/org/neo4j/cdc/client/CDCClientIT.java b/src/test/java/org/neo4j/cdc/client/CDCClientIT.java index bde5854..e83b84f 100644 --- a/src/test/java/org/neo4j/cdc/client/CDCClientIT.java +++ b/src/test/java/org/neo4j/cdc/client/CDCClientIT.java @@ -23,6 +23,8 @@ import java.time.*; import java.util.*; import org.assertj.core.api.InstanceOfAssertFactories; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.neo4j.cdc.client.model.*; @@ -32,42 +34,57 @@ import org.neo4j.driver.*; import org.neo4j.driver.exceptions.FatalDiscoveryException; import org.testcontainers.containers.Neo4jContainer; +import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import reactor.test.StepVerifier; @Testcontainers -public abstract class CDCClientIT { +public class CDCClientIT { - abstract Driver driver(); + private static final String NEO4J_VERSION = "5"; - abstract Neo4jContainer neo4j(); + @SuppressWarnings("resource") + @Container + private static final Neo4jContainer neo4j = new Neo4jContainer<>("neo4j:" + NEO4J_VERSION + "-enterprise") + .withEnv("NEO4J_ACCEPT_LICENSE_AGREEMENT", "yes") + .withAdminPassword("passw0rd"); - abstract Map defaultExpectedAdditionalEntries(); + private static Driver driver; + + @BeforeAll + static void setup() { + driver = GraphDatabase.driver(neo4j.getBoltUrl(), AuthTokens.basic("neo4j", "passw0rd")); + } + + @AfterAll + static void cleanup() { + driver.close(); + } private ChangeIdentifier current; @BeforeEach void reset() { - try (var session = driver().session(SessionConfig.forDatabase("system"))) { + try (var session = driver.session(SessionConfig.forDatabase("system"))) { session.run( "CREATE OR REPLACE DATABASE $db OPTIONS {txLogEnrichment: $mode} WAIT", Map.of("db", "neo4j", "mode", "FULL")) .consume(); } - try (var session = driver().session()) { + try (var session = driver.session()) { current = currentChangeId(session); } } private static ChangeIdentifier currentChangeId(Session session) { return new ChangeIdentifier( - session.run("CALL cdc.current()").single().get(0).asString()); + session.run("CALL db.cdc.current()").single().get(0).asString()); } @Test void earliest() { - var client = new CDCClient(driver(), Duration.ZERO); + var client = new CDCClient(driver, Duration.ZERO); StepVerifier.create(client.earliest()) .assertNext(cv -> assertNotNull(cv.getId())) @@ -76,7 +93,7 @@ void earliest() { @Test void current() { - var client = new CDCClient(driver(), Duration.ZERO); + var client = new CDCClient(driver, Duration.ZERO); StepVerifier.create(client.current()) .assertNext(cv -> assertNotNull(cv.getId())) @@ -85,9 +102,9 @@ void current() { @Test void changesCanBeQueried() { - var client = new CDCClient(driver(), Duration.ZERO); + var client = new CDCClient(driver, Duration.ZERO); - try (Session session = driver().session()) { + try (Session session = driver.session()) { session.run("CREATE ()").consume(); } @@ -99,7 +116,7 @@ void changesCanBeQueried() { @Test void respectsSessionConfigSupplier() { var client = new CDCClient( - driver(), + driver, () -> SessionConfig.builder().withDatabase("unknownDatabase").build()); StepVerifier.create(client.current()) .expectError(FatalDiscoveryException.class) @@ -108,7 +125,7 @@ void respectsSessionConfigSupplier() { @Test void shouldReturnCypherTypesWithoutConversion() { - var client = new CDCClient(driver(), Duration.ZERO); + var client = new CDCClient(driver, Duration.ZERO); var props = new HashMap(); props.put("bool", true); @@ -125,7 +142,7 @@ void shouldReturnCypherTypesWithoutConversion() { props.put("zoned_datetime", ZonedDateTime.of(1990, 5, 1, 23, 59, 59, 0, ZoneId.of("UTC"))); props.put("zoned_time", OffsetTime.of(23, 59, 59, 0, ZoneOffset.ofHours(1))); - try (Session session = driver().session()) { + try (Session session = driver.session()) { session.run("CREATE (a) SET a = $props", Map.of("props", props)).consume(); } @@ -144,9 +161,9 @@ void shouldReturnCypherTypesWithoutConversion() { @Test void metadataShouldNotHaveAdditionalEntries() { - CDCClient client = new CDCClient(driver(), Duration.ZERO); + CDCClient client = new CDCClient(driver, Duration.ZERO); - try (var session = driver().session()) { + try (var session = driver.session()) { session.run("CREATE (p:Person)", emptyMap()).consume(); StepVerifier.create(client.query(current)) @@ -158,9 +175,9 @@ void metadataShouldNotHaveAdditionalEntries() { @Test void nodeChangesCanBeQueried() { - CDCClient client = new CDCClient(driver(), Duration.ZERO); + CDCClient client = new CDCClient(driver, Duration.ZERO); - try (var session = driver().session()) { + try (var session = driver.session()) { session.run("CREATE CONSTRAINT FOR (p:Person) REQUIRE (p.first_name, p.last_name) IS NODE KEY") .consume(); @@ -231,9 +248,9 @@ void nodeChangesCanBeQueried() { @Test void relationshipChangesCanBeQueried() { - var client = new CDCClient(driver(), Duration.ZERO); + var client = new CDCClient(driver, Duration.ZERO); - try (var session = driver().session()) { + try (var session = driver.session()) { session.run("CREATE CONSTRAINT FOR (p:Person) REQUIRE (p.id) IS NODE KEY") .consume(); session.run("CREATE CONSTRAINT FOR (p:Place) REQUIRE (p.id) IS NODE KEY") @@ -315,7 +332,7 @@ void relationshipChangesCanBeQueried() { @Test void selectorsArePassedToServer() { - try (var session = driver().session()) { + try (var session = driver.session()) { session.run("CREATE CONSTRAINT FOR (p:Person) REQUIRE (p.id) IS NODE KEY") .consume(); session.run("CREATE CONSTRAINT FOR (p:Place) REQUIRE (p.id) IS NODE KEY") @@ -347,7 +364,7 @@ void selectorsArePassedToServer() { .get(0) .asString(); - StepVerifier.create(new CDCClient(driver(), Duration.ZERO).query(current)) + StepVerifier.create(new CDCClient(driver, Duration.ZERO).query(current)) .assertNext(n -> assertThat(n).extracting("event.elementId").isEqualTo(person1)) .assertNext(n -> assertThat(n).extracting("event.elementId").isEqualTo(person2)) .assertNext(n -> assertThat(n).extracting("event.elementId").isEqualTo(place)) @@ -355,7 +372,7 @@ void selectorsArePassedToServer() { .verifyComplete(); StepVerifier.create(new CDCClient( - driver(), + driver, Duration.ZERO, NodeSelector.builder() .withOperation(EntityOperation.CREATE) @@ -366,7 +383,7 @@ void selectorsArePassedToServer() { .verifyComplete(); StepVerifier.create(new CDCClient( - driver(), + driver, Duration.ZERO, NodeSelector.builder() .withOperation(EntityOperation.CREATE) @@ -382,7 +399,7 @@ void selectorsArePassedToServer() { .verifyComplete(); StepVerifier.create(new CDCClient( - driver(), + driver, Duration.ZERO, RelationshipSelector.builder() .withType("BORN_IN") @@ -392,7 +409,7 @@ void selectorsArePassedToServer() { .verifyComplete(); StepVerifier.create(new CDCClient( - driver(), + driver, Duration.ZERO, NodeSelector.builder() .withLabels(Set.of("Place")) @@ -410,7 +427,7 @@ void selectorsArePassedToServer() { @Test void selectorsDoFilteringCorrectly() { - try (var session = driver().session()) { + try (var session = driver.session()) { session.run("CREATE CONSTRAINT FOR (p:Person) REQUIRE (p.id) IS NODE KEY") .consume(); session.run("CREATE CONSTRAINT FOR (p:Place) REQUIRE (p.id) IS NODE KEY") @@ -480,7 +497,7 @@ void selectorsDoFilteringCorrectly() { Map.of("bornIn", bornIn, "hospital", "state hospital", "doctor", "doctor who")) .consume(); - StepVerifier.create(new CDCClient(driver(), Duration.ZERO).query(current)) + StepVerifier.create(new CDCClient(driver, Duration.ZERO).query(current)) .assertNext(n -> assertThat(n).extracting("event.elementId").isEqualTo(person1)) .assertNext(n -> assertThat(n).extracting("event.elementId").isEqualTo(person2)) .assertNext(n -> assertThat(n).extracting("event.elementId").isEqualTo(place)) @@ -489,7 +506,7 @@ void selectorsDoFilteringCorrectly() { .verifyComplete(); StepVerifier.create(new CDCClient( - driver(), + driver, Duration.ZERO, NodeSelector.builder() .withOperation(EntityOperation.CREATE) @@ -508,7 +525,7 @@ void selectorsDoFilteringCorrectly() { .verifyComplete(); StepVerifier.create(new CDCClient( - driver(), + driver, Duration.ZERO, NodeSelector.builder() .withOperation(EntityOperation.CREATE) @@ -527,7 +544,7 @@ void selectorsDoFilteringCorrectly() { .verifyComplete(); StepVerifier.create(new CDCClient( - driver(), + driver, Duration.ZERO, NodeSelector.builder() .withOperation(EntityOperation.CREATE) @@ -546,7 +563,7 @@ void selectorsDoFilteringCorrectly() { .verifyComplete(); StepVerifier.create(new CDCClient( - driver(), + driver, Duration.ZERO, NodeSelector.builder() .withLabels(Set.of("Place")) @@ -584,7 +601,7 @@ void selectorsDoFilteringCorrectly() { .verifyComplete(); StepVerifier.create(new CDCClient( - driver(), + driver, Duration.ZERO, NodeSelector.builder() .withLabels(Set.of("Person")) @@ -616,7 +633,7 @@ void selectorsDoFilteringCorrectly() { .verifyComplete(); StepVerifier.create(new CDCClient( - driver(), + driver, Duration.ZERO, RelationshipSelector.builder() .withType("BORN_IN") @@ -647,7 +664,7 @@ void selectorsDoFilteringCorrectly() { .verifyComplete(); StepVerifier.create(new CDCClient( - driver(), + driver, Duration.ZERO, RelationshipSelector.builder() .withType("BORN_IN") @@ -679,7 +696,7 @@ void selectorsDoFilteringCorrectly() { // first matching selector wins StepVerifier.create(new CDCClient( - driver(), + driver, Duration.ZERO, RelationshipSelector.builder() .withOperation(EntityOperation.CREATE) @@ -707,7 +724,7 @@ void selectorsDoFilteringCorrectly() { @Test void userSelectorsFilterCorrectly() { // prepare - try (var session = driver().session()) { + try (var session = driver.session()) { session.run( "CREATE OR REPLACE USER $user SET PLAINTEXT PASSWORD $pwd CHANGE NOT REQUIRED", Map.of("user", "test", "pwd", "passw0rd")) @@ -720,7 +737,7 @@ void userSelectorsFilterCorrectly() { } // make changes with test user - try (var driver = GraphDatabase.driver(neo4j().getBoltUrl(), AuthTokens.basic("test", "passw0rd")); + try (var driver = GraphDatabase.driver(neo4j.getBoltUrl(), AuthTokens.basic("test", "passw0rd")); var session = driver.session(); var impersonatedSession = driver.session( SessionConfig.builder().withImpersonatedUser("neo4j").build())) { @@ -733,13 +750,13 @@ void userSelectorsFilterCorrectly() { } // make changes with neo4j user - try (var session = driver().session()) { + try (var session = driver.session()) { session.run("UNWIND range(1, 100) AS n CREATE (:Neo4j {id: n})").consume(); } // verify authenticatedUser = test StepVerifier.create(new CDCClient( - driver(), + driver, EntitySelector.builder() .withAuthenticatedUser("test") .build()) @@ -755,7 +772,7 @@ void userSelectorsFilterCorrectly() { // verify authenticatedUser = neo4j StepVerifier.create(new CDCClient( - driver(), + driver, EntitySelector.builder() .withAuthenticatedUser("neo4j") .build()) @@ -771,7 +788,7 @@ void userSelectorsFilterCorrectly() { // verify executingUser = neo4j StepVerifier.create(new CDCClient( - driver(), + driver, EntitySelector.builder() .withExecutingUser("neo4j") .build()) @@ -787,7 +804,7 @@ void userSelectorsFilterCorrectly() { // verify executingUser = test StepVerifier.create(new CDCClient( - driver(), + driver, EntitySelector.builder() .withExecutingUser("test") .build()) @@ -803,7 +820,7 @@ void userSelectorsFilterCorrectly() { // verify authenticatedUser = test, executingUser = neo4j StepVerifier.create(new CDCClient( - driver(), + driver, EntitySelector.builder() .withAuthenticatedUser("test") .withExecutingUser("neo4j") @@ -823,7 +840,7 @@ void userSelectorsFilterCorrectly() { @Test void txMetadataSelectorFiltersCorrectly() { - try (var session = driver().session()) { + try (var session = driver.session()) { session.run( "UNWIND range(1, 100) AS n CREATE (:Test {id: n})", TransactionConfig.builder() @@ -832,7 +849,7 @@ void txMetadataSelectorFiltersCorrectly() { .consume(); } - try (var session = driver().session()) { + try (var session = driver.session()) { session.run( "UNWIND range(1, 100) AS n CREATE (:Other {id: n})", TransactionConfig.builder() @@ -841,7 +858,7 @@ void txMetadataSelectorFiltersCorrectly() { .consume(); } - try (var session = driver().session()) { + try (var session = driver.session()) { session.run( "UNWIND range(1, 100) AS n CREATE (:Another {id: n})", TransactionConfig.builder() @@ -851,7 +868,7 @@ void txMetadataSelectorFiltersCorrectly() { } StepVerifier.create(new CDCClient( - driver(), + driver, EntitySelector.builder() .withTxMetadata(Map.of("app", "Test")) .build()) @@ -866,7 +883,7 @@ void txMetadataSelectorFiltersCorrectly() { .verifyComplete(); StepVerifier.create(new CDCClient( - driver(), + driver, EntitySelector.builder() .withTxMetadata(Map.of("app", "Other")) .build()) @@ -882,7 +899,7 @@ void txMetadataSelectorFiltersCorrectly() { .verifyComplete(); StepVerifier.create(new CDCClient( - driver(), + driver, EntitySelector.builder() .withTxMetadata(Map.of("app", "Other", "appUser", "test")) .build()) diff --git a/src/test/java/org/neo4j/cdc/client/CDCClientLatestIT.java b/src/test/java/org/neo4j/cdc/client/CDCClientLatestIT.java deleted file mode 100644 index 6e32492..0000000 --- a/src/test/java/org/neo4j/cdc/client/CDCClientLatestIT.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright (c) "Neo4j" - * Neo4j Sweden AB [https://neo4j.com] - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.neo4j.cdc.client; - -import java.util.Map; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; -import org.neo4j.driver.AuthTokens; -import org.neo4j.driver.Driver; -import org.neo4j.driver.GraphDatabase; -import org.testcontainers.containers.Neo4jContainer; -import org.testcontainers.junit.jupiter.Container; - -public class CDCClientLatestIT extends CDCClientIT { - - private static final String NEO4J_VERSION = "5"; - - @SuppressWarnings("resource") - @Container - private static final Neo4jContainer neo4j = new Neo4jContainer<>("neo4j:" + NEO4J_VERSION + "-enterprise") - .withEnv("NEO4J_ACCEPT_LICENSE_AGREEMENT", "yes") - .withAdminPassword("passw0rd"); - - private static Driver driver; - - @BeforeAll - static void setup() { - driver = GraphDatabase.driver(neo4j.getBoltUrl(), AuthTokens.basic("neo4j", "passw0rd")); - } - - @AfterAll - static void cleanup() { - driver.close(); - } - - @Override - Driver driver() { - return driver; - } - - @Override - Neo4jContainer neo4j() { - return neo4j; - } - - @Override - Map defaultExpectedAdditionalEntries() { - return Map.of("databaseName", "neo4j"); - } -} diff --git a/src/test/java/org/neo4j/cdc/client/CDCClientWithAggressiveLogPruningIT.java b/src/test/java/org/neo4j/cdc/client/CDCClientWithAggressiveLogPruningIT.java index e18d0d2..b40e85d 100644 --- a/src/test/java/org/neo4j/cdc/client/CDCClientWithAggressiveLogPruningIT.java +++ b/src/test/java/org/neo4j/cdc/client/CDCClientWithAggressiveLogPruningIT.java @@ -150,7 +150,7 @@ void shouldNotFailWithInvalidChangeIdentifierAfterLogPruningWhenStreaming() private ChangeIdentifier current() { try (var session = driver.session(SessionConfig.forDatabase("neo4j"))) { return new ChangeIdentifier( - session.run("CALL cdc.current()").single().get(0).asString()); + session.run("CALL db.cdc.current()").single().get(0).asString()); } } }