Skip to content

Commit

Permalink
fix: use new procedure names (#48)
Browse files Browse the repository at this point in the history
  • Loading branch information
ali-ince authored Aug 27, 2024
1 parent 86cd34d commit 2070ee2
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 192 deletions.
16 changes: 8 additions & 8 deletions src/main/java/org/neo4j/cdc/client/CDCClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@
public class CDCClient implements CDCService {
private final Logger log = LoggerFactory.getLogger(CDCClient.class);

private static final String CDC_EARLIEST_STATEMENT = "call cdc.earliest()";
private static final String CDC_CURRENT_STATEMENT = "call cdc.current()";
private static final String CDC_QUERY_STATEMENT = "call cdc.query($from, $selectors)";
private static final String CDC_EARLIEST_STATEMENT = "call db.cdc.earliest()";
private static final String CDC_CURRENT_STATEMENT = "call db.cdc.current()";
private static final String CDC_QUERY_STATEMENT = "call db.cdc.query($from, $selectors)";
private final Driver driver;
private final List<Selector> selectors;
private final SessionConfigSupplier sessionConfigSupplier;
Expand Down Expand Up @@ -114,12 +114,12 @@ public CDCClient(

@Override
public Mono<ChangeIdentifier> earliest() {
return queryForChangeIdentifier(CDC_EARLIEST_STATEMENT, "cdc.earliest");
return queryForChangeIdentifier(CDC_EARLIEST_STATEMENT, "db.cdc.earliest");
}

@Override
public Mono<ChangeIdentifier> current() {
return queryForChangeIdentifier(CDC_CURRENT_STATEMENT, "cdc.current");
return queryForChangeIdentifier(CDC_CURRENT_STATEMENT, "db.cdc.current");
}

@Override
Expand All @@ -133,7 +133,7 @@ public Flux<ChangeEvent> query(ChangeIdentifier from) {
"selectors",
selectors.stream().map(Selector::asMap).collect(Collectors.toList()));

log.trace("running cdc.query using parameters {}", params);
log.trace("running db.cdc.query using parameters {}", params);
RxResult result = tx.run(CDC_QUERY_STATEMENT, params);

return Flux.from(result.records())
Expand All @@ -152,7 +152,7 @@ public Flux<ChangeEvent> stream(ChangeIdentifier from) {
var query = Flux.usingWhen(
Mono.fromSupplier(() -> driver.rxSession(sessionConfigSupplier.sessionConfig())),
(RxSession session) -> Flux.from(session.readTransaction(tx -> {
var current = Mono.from(tx.run("CALL cdc.current()").records())
var current = Mono.from(tx.run("CALL db.cdc.current()").records())
.map(MapAccessor::asMap)
.map(ResultMapper::parseChangeIdentifier);

Expand All @@ -162,7 +162,7 @@ public Flux<ChangeEvent> stream(ChangeIdentifier from) {
"selectors",
selectors.stream().map(Selector::asMap).collect(Collectors.toList()));

log.trace("running cdc.query using parameters {}", params);
log.trace("running db.cdc.query using parameters {}", params);
RxResult result = tx.run(CDC_QUERY_STATEMENT, params);

return current.flatMapMany(changeId -> Flux.from(result.records())
Expand Down
69 changes: 0 additions & 69 deletions src/test/java/org/neo4j/cdc/client/CDCClient514IT.java

This file was deleted.

Loading

0 comments on commit 2070ee2

Please sign in to comment.