Skip to content

Commit

Permalink
feat: add transaction config supplier (#58)
Browse files Browse the repository at this point in the history
* feat: add transaction config supplier

* feat: add config to all read transaction methods

* style: format licence

* test: verify tx config supplier being used

* style: formating
  • Loading branch information
Emrehzl94 authored Aug 27, 2024
1 parent f928a86 commit dcee7ac
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 42 deletions.
121 changes: 79 additions & 42 deletions src/main/java/org/neo4j/cdc/client/CDCClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.neo4j.cdc.client.selector.Selector;
import org.neo4j.driver.Driver;
import org.neo4j.driver.SessionConfig;
import org.neo4j.driver.TransactionConfig;
import org.neo4j.driver.reactive.RxResult;
import org.neo4j.driver.reactive.RxSession;
import org.neo4j.driver.types.MapAccessor;
Expand All @@ -48,6 +49,7 @@ public class CDCClient implements CDCService {
private final Driver driver;
private final List<Selector> selectors;
private final SessionConfigSupplier sessionConfigSupplier;
private final TransactionConfigSupplier transactionConfigSupplier;
private final Duration streamingPollInterval;

/**
Expand All @@ -74,6 +76,7 @@ public CDCClient(Driver driver, Selector... selectors) {
public CDCClient(Driver driver, Duration streamingPollInterval, Selector... selectors) {
this.driver = Objects.requireNonNull(driver);
this.sessionConfigSupplier = () -> SessionConfig.builder().build();
this.transactionConfigSupplier = () -> TransactionConfig.builder().build();
this.streamingPollInterval = Objects.requireNonNull(streamingPollInterval);
this.selectors = selectors == null ? List.of() : Arrays.asList(selectors);
}
Expand Down Expand Up @@ -108,6 +111,31 @@ public CDCClient(
Selector... selectors) {
this.driver = Objects.requireNonNull(driver);
this.sessionConfigSupplier = sessionConfigSupplier;
this.transactionConfigSupplier = () -> TransactionConfig.builder().build();
this.streamingPollInterval = Objects.requireNonNull(streamingPollInterval);
this.selectors = selectors == null ? List.of() : Arrays.asList(selectors);
}

/**
* Construct an instance from a driver, a session config supplier, a transaction config supplier, a poll interval and an optional list of selectors.
*
* @param driver Driver instance to use
* @param sessionConfigSupplier a supplier to customise session configuration
* @param transactionConfigSupplier a supplier to customise transaction configuration
* @param streamingPollInterval Polling interval to mimic streaming when using @link{stream} method
* @param selectors List of selectors to query changes for
*
* @see Selector
*/
public CDCClient(
Driver driver,
SessionConfigSupplier sessionConfigSupplier,
TransactionConfigSupplier transactionConfigSupplier,
Duration streamingPollInterval,
Selector... selectors) {
this.driver = Objects.requireNonNull(driver);
this.sessionConfigSupplier = sessionConfigSupplier;
this.transactionConfigSupplier = transactionConfigSupplier;
this.streamingPollInterval = Objects.requireNonNull(streamingPollInterval);
this.selectors = selectors == null ? List.of() : Arrays.asList(selectors);
}
Expand All @@ -126,20 +154,24 @@ public Mono<ChangeIdentifier> current() {
public Flux<ChangeEvent> query(ChangeIdentifier from) {
return Flux.usingWhen(
Mono.fromSupplier(() -> driver.rxSession(sessionConfigSupplier.sessionConfig())),
(RxSession session) -> Flux.from(session.readTransaction(tx -> {
var params = Map.of(
"from",
from.getId(),
"selectors",
selectors.stream().map(Selector::asMap).collect(Collectors.toList()));

log.trace("running db.cdc.query using parameters {}", params);
RxResult result = tx.run(CDC_QUERY_STATEMENT, params);

return Flux.from(result.records())
.map(MapAccessor::asMap)
.map(ResultMapper::parseChangeEvent);
})),
(RxSession session) -> Flux.from(session.readTransaction(
tx -> {
var params = Map.of(
"from",
from.getId(),
"selectors",
selectors.stream()
.map(Selector::asMap)
.collect(Collectors.toList()));

log.trace("running db.cdc.query using parameters {}", params);
RxResult result = tx.run(CDC_QUERY_STATEMENT, params);

return Flux.from(result.records())
.map(MapAccessor::asMap)
.map(ResultMapper::parseChangeEvent);
},
transactionConfigSupplier.transactionConfig())),
RxSession::close)
.map(this::applyPropertyFilters)
.doOnSubscribe(s -> log.trace("subscribed to cdc query"))
Expand All @@ -151,28 +183,31 @@ public Flux<ChangeEvent> stream(ChangeIdentifier from) {

var query = Flux.usingWhen(
Mono.fromSupplier(() -> driver.rxSession(sessionConfigSupplier.sessionConfig())),
(RxSession session) -> Flux.from(session.readTransaction(tx -> {
var current = Mono.from(tx.run("CALL db.cdc.current()").records())
.map(MapAccessor::asMap)
.map(ResultMapper::parseChangeIdentifier);

var params = Map.of(
"from",
cursor.get().getId(),
"selectors",
selectors.stream().map(Selector::asMap).collect(Collectors.toList()));

log.trace("running db.cdc.query using parameters {}", params);
RxResult result = tx.run(CDC_QUERY_STATEMENT, params);

return current.flatMapMany(changeId -> Flux.from(result.records())
.map(MapAccessor::asMap)
.map(ResultMapper::parseChangeEvent)
.switchIfEmpty(Flux.defer(() -> {
cursor.set(changeId);
return Flux.empty();
})));
})),
(RxSession session) -> Flux.from(session.readTransaction(
tx -> {
var current = Mono.from(
tx.run("CALL db.cdc.current()").records())
.map(MapAccessor::asMap)
.map(ResultMapper::parseChangeIdentifier);

var params = Map.of(
"from",
cursor.get().getId(),
"selectors",
selectors.stream().map(Selector::asMap).collect(Collectors.toList()));

log.trace("running db.cdc.query using parameters {}", params);
RxResult result = tx.run(CDC_QUERY_STATEMENT, params);

return current.flatMapMany(changeId -> Flux.from(result.records())
.map(MapAccessor::asMap)
.map(ResultMapper::parseChangeEvent)
.switchIfEmpty(Flux.defer(() -> {
cursor.set(changeId);
return Flux.empty();
})));
},
transactionConfigSupplier.transactionConfig())),
RxSession::close);

return Flux.concat(query, Mono.delay(streamingPollInterval).mapNotNull(x -> null))
Expand Down Expand Up @@ -200,12 +235,14 @@ private ChangeEvent applyPropertyFilters(ChangeEvent original) {
private Mono<ChangeIdentifier> queryForChangeIdentifier(String query, String description) {
return Mono.usingWhen(
Mono.fromSupplier(() -> driver.rxSession(sessionConfigSupplier.sessionConfig())),
(RxSession session) -> Mono.from(session.readTransaction(tx -> {
RxResult result = tx.run(query);
return Mono.from(result.records())
.map(MapAccessor::asMap)
.map(ResultMapper::parseChangeIdentifier);
})),
(RxSession session) -> Mono.from(session.readTransaction(
tx -> {
RxResult result = tx.run(query);
return Mono.from(result.records())
.map(MapAccessor::asMap)
.map(ResultMapper::parseChangeIdentifier);
},
transactionConfigSupplier.transactionConfig())),
RxSession::close)
.doOnSubscribe(s -> log.trace("subscribed to {}", description))
.doOnSuccess(c -> log.trace("subscription to {} completed with '{}'", description, c))
Expand Down
33 changes: 33 additions & 0 deletions src/main/java/org/neo4j/cdc/client/TransactionConfigSupplier.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [https://neo4j.com]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.cdc.client;

import org.neo4j.driver.TransactionConfig;

/**
* The implementation can provide a transaction config that
* will be called each time before a transaction gets created.
*/
@FunctionalInterface
public interface TransactionConfigSupplier {
/**
* {@link TransactionConfig} to be used with the current transaction.
*
* @return transactionConfig object.
*/
TransactionConfig transactionConfig();
}
25 changes: 25 additions & 0 deletions src/test/java/org/neo4j/cdc/client/CDCClientIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,31 @@ void respectsSessionConfigSupplier() {
.verify();
}

@Test
void respectsTransactionConfigSupplier() {
var clientToSuccess = new CDCClient(
driver,
() -> SessionConfig.builder().build(),
() -> TransactionConfig.builder()
.withMetadata(Map.of("app", "test"))
.build(),
Duration.ofSeconds(1));

StepVerifier.create(clientToSuccess.query(current)).verifyComplete();

var clientToFail = new CDCClient(
driver,
() -> SessionConfig.builder().build(),
() -> TransactionConfig.builder()
.withTimeout(Duration.ofNanos(-1))
.build(),
Duration.ofSeconds(1));

StepVerifier.create(clientToFail.query(current))
.expectErrorMessage("Transaction timeout should not be negative")
.verify();
}

@Test
void shouldReturnCypherTypesWithoutConversion() {
var client = new CDCClient(driver, Duration.ZERO);
Expand Down

0 comments on commit dcee7ac

Please sign in to comment.