Skip to content

Commit

Permalink
WATCH is now working in the same time as MULTI when called inside a M…
Browse files Browse the repository at this point in the history
…ULTI (#3027)
  • Loading branch information
tishun committed Dec 1, 2024
1 parent 7f455ec commit 1cd0ecf
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ private static boolean isTransactionActive(StatefulConnection<?, ?> connection)

private static boolean isTxControlMethod(String methodName, Object[] args) {

if (methodName.equals("exec") || methodName.equals("multi") || methodName.equals("discard")) {
if (methodName.equals("exec") || methodName.equals("multi") || methodName.equals("discard")
|| methodName.equals("watch")) {
return true;
}

Expand Down
54 changes: 26 additions & 28 deletions src/main/java/io/lettuce/core/StatefulRedisConnectionImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,6 @@ public boolean isMulti() {
public <T> RedisCommand<K, V, T> dispatch(RedisCommand<K, V, T> command) {

RedisCommand<K, V, T> toSend = preProcessCommand(command);

potentiallyEnableMulti(command);

return super.dispatch(toSend);
}

Expand All @@ -179,35 +176,21 @@ public <T> RedisCommand<K, V, T> dispatch(RedisCommand<K, V, T> command) {

commands.forEach(o -> {
RedisCommand<K, V, ?> command = preProcessCommand(o);

sentCommands.add(command);
potentiallyEnableMulti(command);
});

return super.dispatch(sentCommands);
}

private void potentiallyEnableMulti(RedisCommand<K, V, ?> command) {

if (command.getType().toString().equals(MULTI.name())) {

multi = (multi == null ? new MultiOutput<>(codec) : multi);

if (command instanceof CompleteableCommand) {
((CompleteableCommand<?>) command).onComplete((ignored, e) -> {
if (e != null) {
multi = null;
}
});
}
}
}

// TODO [tihomir.mateev] Refactor to include as part of the Command interface
// All these if statements clearly indicate this is a problem best solve by each command
// (defining a pre and post processing behaviour of the command)
protected <T> RedisCommand<K, V, T> preProcessCommand(RedisCommand<K, V, T> command) {

RedisCommand<K, V, T> local = command;
String commandType = command.getType().toString();

if (local.getType().toString().equals(AUTH.name())) {
if (commandType.equals(AUTH.name())) {
local = attachOnComplete(local, status -> {
if ("OK".equals(status)) {

Expand All @@ -224,7 +207,7 @@ protected <T> RedisCommand<K, V, T> preProcessCommand(RedisCommand<K, V, T> comm
});
}

if (local.getType().toString().equals(SELECT.name())) {
if (commandType.equals(SELECT.name())) {
local = attachOnComplete(local, status -> {
if ("OK".equals(status)) {
Long db = CommandArgsAccessor.getFirstInteger(command.getArgs());
Expand All @@ -235,30 +218,30 @@ protected <T> RedisCommand<K, V, T> preProcessCommand(RedisCommand<K, V, T> comm
});
}

if (local.getType().toString().equals(READONLY.name())) {
if (commandType.equals(READONLY.name())) {
local = attachOnComplete(local, status -> {
if ("OK".equals(status)) {
state.setReadOnly(true);
}
});
}

if (local.getType().toString().equals(READWRITE.name())) {
if (commandType.equals(READWRITE.name())) {
local = attachOnComplete(local, status -> {
if ("OK".equals(status)) {
state.setReadOnly(false);
}
});
}

if (local.getType().toString().equals(DISCARD.name())) {
if (commandType.equals(DISCARD.name())) {
if (multi != null) {
multi.cancel();
multi = null;
}
}

if (local.getType().toString().equals(EXEC.name())) {
if (commandType.equals(EXEC.name())) {
MultiOutput<K, V> multiOutput = this.multi;
this.multi = null;
if (multiOutput == null) {
Expand All @@ -267,10 +250,25 @@ protected <T> RedisCommand<K, V, T> preProcessCommand(RedisCommand<K, V, T> comm
local.setOutput((MultiOutput) multiOutput);
}

if (multi != null && !local.getType().toString().equals(MULTI.name())) {
if (multi != null && !commandType.equals(MULTI.name()) && !commandType.equals(WATCH.name())) {
// ignore MULTI and WATCH commands nested in another MULTI
local = new TransactionalCommand<>(local);
multi.add(local);
}

if (commandType.equals(MULTI.name())) {

multi = (multi == null ? new MultiOutput<>(codec) : multi);

if (command instanceof CompleteableCommand) {
((CompleteableCommand<?>) command).onComplete((ignored, e) -> {
if (e != null) {
multi = null;
}
});
}
}

return local;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,30 @@ void errorInMulti() {
assertThat((String) values.get(2)).isEqualTo(value);
}

@Test
void errorWhileWatchInsideMulti() {
assertThat(redis.multi()).isEqualTo("OK");
assertThat(redis.set(key, value)).isEqualTo(null);
assertThatThrownBy(() -> redis.watch(key)).isInstanceOf(RedisCommandExecutionException.class)
.hasMessageContaining("ERR WATCH inside MULTI is not allowed");
assertThat(redis.get(key)).isEqualTo(null);
TransactionResult values = redis.exec();
assertThat(values.wasDiscarded()).isFalse();
assertThat((String) values.get(0)).isEqualTo("OK");
assertThat((String) values.get(1)).isEqualTo(value);
}

@Test
void errorWhileMultiInsideMulti() {
assertThat(redis.multi()).isEqualTo("OK");
assertThat(redis.set(key, value)).isEqualTo(null);
assertThatThrownBy(redis::multi).isInstanceOf(RedisCommandExecutionException.class)
.hasMessageContaining("ERR MULTI calls can not be nested");
assertThat(redis.get(key)).isEqualTo(null);
TransactionResult values = redis.exec();
assertThat(values.wasDiscarded()).isFalse();
}

@Test
void execWithoutMulti() {
assertThatThrownBy(redis::exec).isInstanceOf(RedisCommandExecutionException.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ protected Object handleInvocation(Object proxy, Method method, Object[] args) th
if (result instanceof Mono<?>) {
Mono<?> mono = (Mono<?>) result;

if (!method.getName().equals("exec") && !method.getName().equals("multi")) {
if (!method.getName().equals("exec") && !method.getName().equals("multi")
&& !method.getName().equals("watch")) {
if (connection instanceof StatefulRedisConnection && ((StatefulRedisConnection) connection).isMulti()) {
mono.subscribe();
return null;
Expand Down

0 comments on commit 1cd0ecf

Please sign in to comment.