diff --git a/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java index f6e5687cf9..db05734500 100644 --- a/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java @@ -337,7 +337,7 @@ public Long getMaxlen() { * @since 2.3 */ public boolean hasMaxlen() { - return maxlen != null && maxlen > 0; + return maxlen != null; } /** @@ -685,7 +685,7 @@ default Mono xPending(ByteBuffer key, String groupName) Assert.notNull(key, "Key must not be null"); Assert.notNull(groupName, "GroupName must not be null"); - return xPendingSummary(Mono.just(new PendingRecordsCommand(key, groupName, null, Range.unbounded(), null))).next() + return xPendingSummary(Mono.just(PendingRecordsCommand.pending(key, groupName))).next() .map(CommandResponse::getOutput); } @@ -726,7 +726,7 @@ default Mono xPending(ByteBuffer key, Consumer consumer) { */ @Nullable default Mono xPending(ByteBuffer key, String groupName, String consumerName) { - return xPending(Mono.just(new PendingRecordsCommand(key, groupName, consumerName, Range.unbounded(), null))).next() + return xPending(Mono.just(PendingRecordsCommand.pending(key, groupName).consumer(consumerName))).next() .map(CommandResponse::getOutput); } @@ -743,7 +743,7 @@ default Mono xPending(ByteBuffer key, String groupName, String * @since 2.3 */ default Mono xPending(ByteBuffer key, String groupName, Range range, Long count) { - return xPending(Mono.just(new PendingRecordsCommand(key, groupName, null, range, count))).next() + return xPending(Mono.just(PendingRecordsCommand.pending(key, groupName).range(range, count))).next() .map(CommandResponse::getOutput); } @@ -779,8 +779,8 @@ default Mono xPending(ByteBuffer key, Consumer consumer, Range< */ default Mono xPending(ByteBuffer key, String groupName, String consumerName, Range range, Long count) { - return xPending(Mono.just(new PendingRecordsCommand(key, groupName, consumerName, range, count))).next() - .map(CommandResponse::getOutput); + return xPending(Mono.just(PendingRecordsCommand.pending(key, groupName).consumer(consumerName).range(range, count))) + .next().map(CommandResponse::getOutput); } /** @@ -832,9 +832,15 @@ static PendingRecordsCommand pending(ByteBuffer key, String groupName) { /** * Create new {@link PendingRecordsCommand} with given {@link Range} and limit. * + * @param range must not be {@literal null}. + * @param count the max number of messages to return. Must not be negative. * @return new instance of {@link XPendingOptions}. */ - public PendingRecordsCommand range(Range range, Long count) { + public PendingRecordsCommand range(Range range, Long count) { + + Assert.notNull(range, "Range must not be null"); + Assert.isTrue(count > -1, "Count must not be negative"); + return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count); } @@ -886,7 +892,7 @@ public boolean hasConsumer() { * @return {@literal true} count is set. */ public boolean isLimited() { - return count != null && count > -1; + return count != null; } } diff --git a/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java index df9cbd9f52..df097158b0 100644 --- a/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java @@ -214,7 +214,7 @@ public Long getMaxlen() { * @return {@literal true} if {@literal MAXLEN} is set. */ public boolean hasMaxlen() { - return maxlen != null && maxlen > 0; + return maxlen != null; } /** @@ -788,19 +788,28 @@ public static XPendingOptions unbounded() { /** * Create new {@link XPendingOptions} with an unbounded {@link Range} ({@literal - +}). * - * @param count the max number of messages to return. Must not be {@literal null}. + * @param count the max number of messages to return. Must not be negative. * @return new instance of {@link XPendingOptions}. */ public static XPendingOptions unbounded(Long count) { + + Assert.isTrue(count > -1, "Count must not be negative"); + return new XPendingOptions(null, Range.unbounded(), count); } /** * Create new {@link XPendingOptions} with given {@link Range} and limit. * + * @param range must not be {@literal null}. + * @param count the max number of messages to return. Must not be negative. * @return new instance of {@link XPendingOptions}. */ public static XPendingOptions range(Range range, Long count) { + + Assert.notNull(range, "Range must not be null"); + Assert.isTrue(count > -1, "Count must not be negative"); + return new XPendingOptions(null, range, count); } @@ -848,7 +857,7 @@ public boolean hasConsumer() { * @return {@literal true} count is set. */ public boolean isLimited() { - return count != null && count > -1; + return count != null; } } diff --git a/src/main/java/org/springframework/data/redis/connection/stream/StreamReadOptions.java b/src/main/java/org/springframework/data/redis/connection/stream/StreamReadOptions.java index 45c456c0bc..58fd0652e8 100644 --- a/src/main/java/org/springframework/data/redis/connection/stream/StreamReadOptions.java +++ b/src/main/java/org/springframework/data/redis/connection/stream/StreamReadOptions.java @@ -96,7 +96,7 @@ public StreamReadOptions block(Duration timeout) { */ public StreamReadOptions count(long count) { - Assert.isTrue(count > 0, "Count must be greater or equal to zero"); + Assert.isTrue(count > 0, "Count must be greater than zero"); return new StreamReadOptions(block, count, noack); } diff --git a/src/test/java/org/springframework/data/redis/connection/ReactiveStreamCommandsUnitTests.java b/src/test/java/org/springframework/data/redis/connection/ReactiveStreamCommandsUnitTests.java new file mode 100644 index 0000000000..bef26ac866 --- /dev/null +++ b/src/test/java/org/springframework/data/redis/connection/ReactiveStreamCommandsUnitTests.java @@ -0,0 +1,41 @@ +package org.springframework.data.redis.connection; + +import static org.assertj.core.api.Assertions.*; + +import java.nio.ByteBuffer; + +import org.junit.jupiter.api.Test; + +import org.springframework.data.domain.Range; +import org.springframework.data.redis.connection.ReactiveStreamCommands.PendingRecordsCommand; + +/** + * Unit tests for {@link ReactiveStreamCommands}. + * + * @author jinkshower + */ +class ReactiveStreamCommandsUnitTests { + + @Test // GH-2982 + void pendingRecordsCommandRangeShouldThrowExceptionWhenRangeIsNull() { + + ByteBuffer key = ByteBuffer.wrap("my-stream".getBytes()); + String groupName = "my-group"; + + PendingRecordsCommand command = PendingRecordsCommand.pending(key, groupName); + + assertThatThrownBy(() -> command.range(null, 10L)).isInstanceOf(IllegalArgumentException.class); + } + + @Test // GH-2982 + void pendingRecordsCommandRangeShouldThrowExceptionWhenCountIsNegative() { + + ByteBuffer key = ByteBuffer.wrap("my-stream".getBytes()); + String groupName = "my-group"; + + PendingRecordsCommand command = PendingRecordsCommand.pending(key, groupName); + Range range = Range.closed("0", "10"); + + assertThatThrownBy(() -> command.range(range, -1L)).isInstanceOf(IllegalArgumentException.class); + } +} diff --git a/src/test/java/org/springframework/data/redis/connection/RedisStreamCommandsUnitTests.java b/src/test/java/org/springframework/data/redis/connection/RedisStreamCommandsUnitTests.java new file mode 100644 index 0000000000..635f6cbe12 --- /dev/null +++ b/src/test/java/org/springframework/data/redis/connection/RedisStreamCommandsUnitTests.java @@ -0,0 +1,36 @@ +package org.springframework.data.redis.connection; + +import static org.assertj.core.api.Assertions.*; + +import org.junit.jupiter.api.Test; + +import org.springframework.data.domain.Range; +import org.springframework.data.redis.connection.RedisStreamCommands.XPendingOptions; + +/** + * Unit tests for {@link RedisStreamCommands}. + * + * @author jinkshower + */ +class RedisStreamCommandsUnitTests { + + @Test // GH-2982 + void xPendingOptionsUnboundedShouldThrowExceptionWhenCountIsNegative() { + + assertThatThrownBy(() -> XPendingOptions.unbounded(-1L)).isInstanceOf(IllegalArgumentException.class); + } + + @Test // GH-2982 + void xPendingOptionsRangeShouldThrowExceptionWhenRangeIsNull() { + + assertThatThrownBy(() -> XPendingOptions.range(null, 10L)).isInstanceOf(IllegalArgumentException.class); + } + + @Test // GH-2982 + void xPendingOptionsRangeShouldThrowExceptionWhenCountIsNegative() { + + Range range = Range.closed("0", "10"); + + assertThatThrownBy(() -> XPendingOptions.range(range, -1L)).isInstanceOf(IllegalArgumentException.class); + } +} diff --git a/src/test/java/org/springframework/data/redis/core/DefaultReactiveStreamOperationsIntegrationTests.java b/src/test/java/org/springframework/data/redis/core/DefaultReactiveStreamOperationsIntegrationTests.java index 27cf52f0f7..b09d1fb284 100644 --- a/src/test/java/org/springframework/data/redis/core/DefaultReactiveStreamOperationsIntegrationTests.java +++ b/src/test/java/org/springframework/data/redis/core/DefaultReactiveStreamOperationsIntegrationTests.java @@ -289,6 +289,21 @@ void addMaxLenShouldLimitSimpleMessageWithRawSerializerSize() { .verifyComplete(); } + @ParameterizedRedisTest // GH-2982 + void addNegativeMaxlenShouldThrowException() { + + K key = keyFactory.instance(); + HK hashKey = hashKeyFactory.instance(); + HV value = valueFactory.instance(); + + XAddOptions options = XAddOptions.maxlen(-1).approximateTrimming(false); + + streamOperations.add(key, Collections.singletonMap(hashKey, value), options).as(StepVerifier::create) + .expectError(IllegalArgumentException.class).verify(); + + streamOperations.range(key, Range.unbounded()).as(StepVerifier::create).expectNextCount(0L).verifyComplete(); + } + @ParameterizedRedisTest // GH-2915 void addMinIdShouldEvictLowerIdMessages() { @@ -528,6 +543,21 @@ void pendingShouldReadMessageDetails() { } + @ParameterizedRedisTest // GH-2982 + void pendingNegativeCountShouldThrowException() { + + K key = keyFactory.instance(); + HK hashKey = hashKeyFactory.instance(); + HV value = valueFactory.instance(); + + streamOperations.add(key, Collections.singletonMap(hashKey, value)).block(); + + streamOperations.createGroup(key, ReadOffset.from("0-0"), "my-group").block(); + + streamOperations.pending(key, "my-group", Range.unbounded(), -1L).as(StepVerifier::create) + .expectError(IllegalArgumentException.class).verify(); + } + @ParameterizedRedisTest // GH-2465 void claimShouldReadMessageDetails() { diff --git a/src/test/java/org/springframework/data/redis/core/DefaultStreamOperationsIntegrationTests.java b/src/test/java/org/springframework/data/redis/core/DefaultStreamOperationsIntegrationTests.java index 06ab8c5982..7eac62d74a 100644 --- a/src/test/java/org/springframework/data/redis/core/DefaultStreamOperationsIntegrationTests.java +++ b/src/test/java/org/springframework/data/redis/core/DefaultStreamOperationsIntegrationTests.java @@ -206,6 +206,19 @@ void addMaxLenShouldLimitSimpleMessagesSize() { assertThat(message.getValue()).isEqualTo(newValue); } + @ParameterizedRedisTest // GH-2982 + void addNegativeMaxlenShouldThrowException() { + + K key = keyFactory.instance(); + HV value = hashValueFactory.instance(); + + XAddOptions options = XAddOptions.maxlen(-1).approximateTrimming(false); + + assertThatThrownBy(() -> streamOps.add(StreamRecords.objectBacked(value).withStreamKey(key), options)); + + assertThat(streamOps.range(key, Range.unbounded())).isEmpty(); + } + @ParameterizedRedisTest // GH-2915 void addMinIdShouldEvictLowerIdMessages() { @@ -565,6 +578,19 @@ void pendingShouldReadMessageDetails() { assertThat(pending.get(0).getTotalDeliveryCount()).isOne(); } + @ParameterizedRedisTest // GH-2982 + void pendingNegativeCountShouldThrowException() { + K key = keyFactory.instance(); + HK hashKey = hashKeyFactory.instance(); + HV value = hashValueFactory.instance(); + + streamOps.add(key, Collections.singletonMap(hashKey, value)); + streamOps.createGroup(key, ReadOffset.from("0-0"), "my-group"); + + assertThatThrownBy(() -> streamOps.pending(key, "my-group", Range.unbounded(), -1L)) + .isInstanceOf(IllegalArgumentException.class); + } + @ParameterizedRedisTest // GH-2465 void claimShouldReadMessageDetails() {