Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

flushCommands leads to random inbound command order when using large argument values with SSL #2456

Closed
jruaux opened this issue Jul 20, 2023 · 6 comments
Labels
type: bug A general bug
Milestone

Comments

@jruaux
Copy link
Contributor

jruaux commented Jul 20, 2023

Bug Report

Current Behavior

Async commands issued in a pipeline fail only when using a TLS-enabled connection to Redis. The same code works fine with TLS disabled.

I do not see timeout exception or any other error prior to the one at hand (io.lettuce.core.output.StatusOutput does not support set(long)).

Do you have any idea why this happens only with TLS connections?

Stack trace
/35.230.39.240:12000, epid=0x4, chid=0x4] Stack contains: 11 commands
23:18:57.382 FINE io.lettuce.core.protocol.RedisStateMachine	: Decode done, empty stack: true
23:18:57.382 FINE io.lettuce.core.protocol.CommandHandler	: [channel=0xcb3d8c2e, /192.168.100.14:58213 -> /35.230.39.240:12000, epid=0x4, chid=0x4] Completing command LatencyMeteredCommand [type=SET, output=StatusOutput [output=null, error='io.lettuce.core.output.StatusOutput does not support set(long)'], commandType=io.lettuce.core.protocol.AsyncCommand]
23:18:57.383 FINE org.springframework.retry.support.RetryTemplate	: Checking for rethrow: count=1
23:18:57.383 FINE org.springframework.retry.support.RetryTemplate	: Rethrow in retry for policy: count=1
23:18:57.384 FINE org.springframework.batch.core.step.tasklet.TaskletStep	: Applying contribution: [StepContribution: read=11, written=0, filtered=0, readSkips=0, writeSkips=0, processSkips=0, exitStatus=EXECUTING]
23:18:57.384 FINE org.springframework.batch.core.step.tasklet.TaskletStep	: Rollback for Exception: java.util.concurrent.ExecutionException: java.lang.UnsupportedOperationException: io.lettuce.core.output.StatusOutput does not support set(long)

Input Code

The command pipeline is done in the execute method of this class:

https://github.com/redis-developer/spring-batch-redis/blob/master/subprojects/spring-batch-redis/src/main/java/com/redis/spring/batch/common/AbstractOperationItemStreamSupport.java

Input Code
try (StatefulConnection<K, V> connection = pool.borrowObject()) {
	long timeout = connection.getTimeout().toMillis();
	BaseRedisAsyncCommands<K, V> commands = Utils.async(connection);
	List<RedisFuture<O>> futures = new ArrayList<>();
	try {
		connection.setAutoFlushCommands(false);
		execute(commands, items, futures);
		connection.flushCommands();
		List<O> results = new ArrayList<>(futures.size());
		for (RedisFuture<O> future : futures) {
			results.add(future.get(timeout, TimeUnit.MILLISECONDS));
		}
		return results;
	} finally {
		connection.setAutoFlushCommands(true);
	}
}

Environment

  • Lettuce version(s): 6.2.5.RELEASE
  • Redis version: 6.2.6 (Elasticache) and 6.2.10 (Redis Enterprise)
@mp911de
Copy link
Collaborator

mp911de commented Jul 20, 2023

Can you please provide a reproducer that is able to show the problem using Lettuce-code only? StatusCodec is being used across multiple commands and first we need to identify what command is being used and what causes the unexpected response.

@mp911de mp911de added the status: waiting-for-feedback We need additional information before we can continue label Jul 20, 2023
@jruaux
Copy link
Contributor Author

jruaux commented Jul 20, 2023

Here you go:

package com.redis.lettuce.test;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import io.lettuce.core.ClientOptions;
import io.lettuce.core.LettuceFutures;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.RedisURI;
import io.lettuce.core.SslOptions;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.async.BaseRedisAsyncCommands;
import io.lettuce.core.api.async.RedisKeyAsyncCommands;
import io.lettuce.core.api.async.RedisStringAsyncCommands;
import io.lettuce.core.codec.ByteArrayCodec;
import io.lettuce.core.codec.StringCodec;
import io.lettuce.core.support.ConnectionPoolSupport;

public class LettuceIssue2456 implements Callable<Integer> {

    private static final File HOME = new File(System.getProperty("user.home"));

    private static final File TLS = new File(HOME, "/git/redis/tests/tls");

    private static final File CACERT = new File(TLS, "ca.crt");

    private static final File CERT = new File(TLS, "redis.crt");

    private static final File KEY = new File(TLS, "redis.key");

    private static final int LEFT_LIMIT = 48; // numeral '0'

    private static final int RIGHT_LIMIT = 122; // letter 'z'

    private final int maxIterations = 1000;

    private final Random random = new Random();

    private final GenericObjectPool<StatefulRedisConnection<byte[], byte[]>> pool;

    public LettuceIssue2456(RedisClient client) {
        this.pool = ConnectionPoolSupport.createGenericObjectPool(() -> client.connect(ByteArrayCodec.INSTANCE),
                new GenericObjectPoolConfig<>());
    }

    private String randomString(int length) {
        return random.ints(LEFT_LIMIT, RIGHT_LIMIT + 1).filter(i -> (i <= 57 || i >= 65) && (i <= 90 || i >= 97)).limit(length)
                .collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append).toString();
    }

    @Override
    public Integer call() throws Exception {
        for (int iteration = 0; iteration < maxIterations; iteration++) {
            System.out.println("Running iteration " + iteration);
            try (StatefulRedisConnection<byte[], byte[]> connection = pool.borrowObject()) {
                long timeout = connection.getTimeout().toMillis();
                BaseRedisAsyncCommands<byte[], byte[]> commands = connection.async();
                List<RedisFuture<?>> futures = new ArrayList<>();
                try {
                    connection.setAutoFlushCommands(false);
                    execute(iteration, commands, futures);
                    connection.flushCommands();
                    LettuceFutures.awaitAll(timeout, TimeUnit.MILLISECONDS, futures.toArray(new Future[0]));
                } finally {
                    connection.setAutoFlushCommands(true);
                }
            }
        }
        return 0;
    }

    private byte[] key(int iteration, int index) {
        String key = String.format("{myaccount-%s}:", iteration) + UUID.randomUUID().toString();
        return toBytes(key);
    }

    private byte[] toBytes(String key) {
        return ByteArrayCodec.INSTANCE.decodeKey(StringCodec.UTF8.encodeKey(key));
    }

    @SuppressWarnings("unchecked")
    private void execute(int iteration, BaseRedisAsyncCommands<byte[], byte[]> commands, List<RedisFuture<?>> futures)
            throws IOException {
        int batchSize = 10;
        for (int index = 0; index < batchSize; index++) {
            byte[] key = key(iteration, index);
            byte[] value = toBytes(randomString(50000));
            RedisStringAsyncCommands<byte[], byte[]> stringCommands = (RedisStringAsyncCommands<byte[], byte[]>) commands;
            futures.add(stringCommands.set(key, value));
            if (random.nextBoolean()) {
                RedisKeyAsyncCommands<byte[], byte[]> keyCommands = (RedisKeyAsyncCommands<byte[], byte[]>) commands;
                futures.add(keyCommands.pexpireat(key, System.currentTimeMillis() + 10000));
            }
        }
    }

    public static void main(String[] args) throws Exception {
        RedisClient client = client();
        new LettuceIssue2456(client).call();
    }

    private static RedisClient client() {
        RedisURI redisURI = RedisURI.builder().withHost("localhost").withSsl(true).withVerifyPeer(false).build();
        RedisClient client = RedisClient.create(redisURI);
        client.setOptions(clientOptions());
        return client;
    }

    private static ClientOptions clientOptions() {
        return ClientOptions.builder().sslOptions(sslOptions()).build();
    }

    private static SslOptions sslOptions() {
        return SslOptions.builder().trustManager(CACERT).keyManager(CERT, KEY, "".toCharArray()).build();
    }

}

Running with lettuce-core-6.2.5.RELEASE and commons-pool2-2.11.1 against a redis-server with TLS as described here, this produces the following error:

Running iteration 0
Exception in thread "main" io.lettuce.core.RedisException: java.lang.UnsupportedOperationException: io.lettuce.core.output.StatusOutput does not support set(long)
	at io.lettuce.core.internal.Exceptions.fromSynchronization(Exceptions.java:106)
	at io.lettuce.core.internal.Futures.awaitAll(Futures.java:226)
	at io.lettuce.core.LettuceFutures.awaitAll(LettuceFutures.java:59)
	at com.redis.lettuce.test.LettuceIssue2456.call(LettuceIssue2456.java:74)
	at com.redis.lettuce.test.LettuceIssue2456.main(LettuceIssue2456.java:113)
Caused by: java.lang.UnsupportedOperationException: io.lettuce.core.output.StatusOutput does not support set(long)
	at io.lettuce.core.output.CommandOutput.set(CommandOutput.java:107)
	at io.lettuce.core.protocol.RedisStateMachine.safeSet(RedisStateMachine.java:778)
	at io.lettuce.core.protocol.RedisStateMachine.handleInteger(RedisStateMachine.java:404)
	at io.lettuce.core.protocol.RedisStateMachine$State$Type.handle(RedisStateMachine.java:206)
	at io.lettuce.core.protocol.RedisStateMachine.doDecode(RedisStateMachine.java:334)
	at io.lettuce.core.protocol.RedisStateMachine.decode(RedisStateMachine.java:295)
	at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:842)
	at io.lettuce.core.protocol.CommandHandler.decode0(CommandHandler.java:793)
	at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:767)
	at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:659)
	at io.lettuce.core.protocol.CommandHandler.channelRead(CommandHandler.java:599)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1383)
	at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1246)
	at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1295)
	at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:468)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:1623)

@mp911de mp911de added status: waiting-for-triage and removed status: waiting-for-feedback We need additional information before we can continue labels Jul 21, 2023
@mp911de
Copy link
Collaborator

mp911de commented Jul 21, 2023

Thanks a lot, I'll have a look.

@mp911de mp911de changed the title StatusOutput does not support set(long) with TLS-enabled Redis flushCommands leads to random order when using large argument values with SSL Aug 7, 2023
@mp911de mp911de added type: bug A general bug and removed status: waiting-for-triage labels Aug 7, 2023
@mp911de mp911de changed the title flushCommands leads to random order when using large argument values with SSL flushCommands leads to random inbound command order when using large argument values with SSL Aug 7, 2023
@mp911de
Copy link
Collaborator

mp911de commented Aug 7, 2023

After debugging a bit, I have now a stable reproducer. So far, the following aspects seem significant to reproduce the problem:

  1. Usage of SSL, specifically SslHandler
  2. Large command argument values (small values do not reproduce the issue)
  3. Using manual flushing

As far as I've seen, our I/O utilities report command sending completion out of order. Commands are sent in order but the I/O channel uses a different ordering in which the commands are notified that they were sent. We're adding commands to the protocol stack once we received that the command was sent and that protocol stack has a different ordering:

Sending order:

(1) SET
(2) SET
(3) PEXPIREAT

Sending notification:

(1) SET
(3) PEXPIREAT
(2) SET

mp911de added a commit that referenced this issue Aug 7, 2023
To avoid promise completion reordering causing a different protocol stack order, we now eagerly add commands to the protocol stack,
mp911de added a commit that referenced this issue Aug 7, 2023
To avoid promise completion reordering causing a different protocol stack order, we now eagerly add commands to the protocol stack,
@mp911de mp911de closed this as completed Aug 7, 2023
@mp911de mp911de added this to the 6.2.6.RELEASE milestone Aug 7, 2023
@jruaux
Copy link
Contributor Author

jruaux commented Aug 7, 2023

Thank you for resolving this!

@zhaoxiaojie0415
Copy link

After debugging a bit, I have now a stable reproducer. So far, the following aspects seem significant to reproduce the problem:

  1. Usage of SSL, specifically SslHandler
  2. Large command argument values (small values do not reproduce the issue)
  3. Using manual flushing

As far as I've seen, our I/O utilities report command sending completion out of order. Commands are sent in order but the I/O channel uses a different ordering in which the commands are notified that they were sent. We're adding commands to the protocol stack once we received that the command was sent and that protocol stack has a different ordering:

Sending order:

(1) SET
(2) SET
(3) PEXPIREAT

Sending notification:

(1) SET
(3) PEXPIREAT
(2) SET

Hello, I saw this modification about this bug and I have a question to ask. Since one TCP connection is handled by one thread in Netty, why would IO channel use a different order to execute operationComplete?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: bug A general bug
Projects
None yet
Development

No branches or pull requests

3 participants