From 41541ccd608dd7ae0da6939084532e92dbafd16c Mon Sep 17 00:00:00 2001 From: Clement Escoffier Date: Tue, 4 Jul 2023 14:39:36 +0200 Subject: [PATCH] Implement XPending redis command --- .../datasource/stream/PendingMessage.java | 68 +++++++++ .../stream/ReactiveStreamCommands.java | 58 +++++++ .../ReactiveTransactionalStreamCommands.java | 59 +++++++- .../datasource/stream/StreamCommands.java | 58 +++++++ .../stream/TransactionalStreamCommands.java | 56 ++++++- .../redis/datasource/stream/XPendingArgs.java | 51 +++++++ .../datasource/stream/XPendingSummary.java | 64 ++++++++ .../datasource/AbstractStreamCommands.java | 37 +++++ .../BlockingStreamCommandsImpl.java | 18 +++ ...ockingTransactionalStreamCommandsImpl.java | 16 ++ .../ReactiveStreamCommandsImpl.java | 52 +++++++ ...activeTransactionalStreamCommandsImpl.java | 19 +++ .../redis/datasource/StreamCommandsTest.java | 142 ++++++++++++++++++ .../TransactionalStreamCommandsTest.java | 35 ++++- 14 files changed, 725 insertions(+), 8 deletions(-) create mode 100644 extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/stream/PendingMessage.java create mode 100644 extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/stream/XPendingArgs.java create mode 100644 extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/stream/XPendingSummary.java diff --git a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/stream/PendingMessage.java b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/stream/PendingMessage.java new file mode 100644 index 0000000000000..7492a4f92e61c --- /dev/null +++ b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/stream/PendingMessage.java @@ -0,0 +1,68 @@ +package io.quarkus.redis.datasource.stream; + +import java.time.Duration; + +/** + * Represents the result of an expended xpending command. + * In the extended form we no longer see the summary information, instead there is detailed information for each message + * in the pending entries list. For each message four attributes are returned: + * + *

+ * While this structure is mutable, it is highly recommended to use it as a read-only structure. + */ +public class PendingMessage { + private final String messageId; + private final String consumer; + private final Duration durationSinceLastDelivery; + + private final int deliveryCount; + + public PendingMessage(String messageId, String consumer, Duration durationSinceLastDelivery, int deliveryCount) { + this.messageId = messageId; + this.consumer = consumer; + this.durationSinceLastDelivery = durationSinceLastDelivery; + this.deliveryCount = deliveryCount; + } + + /** + * Gets the message id. + * + * @return the message id; + */ + public String getMessageId() { + return messageId; + } + + /** + * Gets the consumer name. + * + * @return the consumer name + */ + public String getConsumer() { + return consumer; + } + + /** + * Gets the duration since the last delivery attempt. + * + * @return the duration + */ + public Duration getDurationSinceLastDelivery() { + return durationSinceLastDelivery; + } + + /** + * Gets the number of delivery attempts. + * + * @return the number of attempts + */ + public int getDeliveryCount() { + return deliveryCount; + } +} diff --git a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/stream/ReactiveStreamCommands.java b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/stream/ReactiveStreamCommands.java index 20266c54fbd2d..2d27cd3ce4a5a 100644 --- a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/stream/ReactiveStreamCommands.java +++ b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/stream/ReactiveStreamCommands.java @@ -590,4 +590,62 @@ Uni>> xreadgroup(String group, String consumer, Map< */ Uni xtrim(K key, XTrimArgs args); + /** + * Execute the command XPENDING. + * Summary: The XPENDING command is the interface to inspect the list of pending messages, and is as thus a very + * important command in order to observe and understand what is happening with a streams consumer groups: what + * clients are active, what messages are pending to be consumed, or to see if there are idle messages. + *

+ * Group: stream + * Requires Redis 5.0.0+ + *

+ * This variant of xpending uses the summary form. + * + * @param key the key + * @param group the group + * @return A {@link io.smallrye.mutiny.Uni} emitting the xpending summary + */ + Uni xpending(K key, String group); + + /** + * Execute the command XPENDING. + * Summary: The XPENDING command is the interface to inspect the list of pending messages, and is as thus a very + * important command in order to observe and understand what is happening with a streams consumer groups: what + * clients are active, what messages are pending to be consumed, or to see if there are idle messages. + *

+ * Group: stream + * Requires Redis 5.0.0+ + *

+ * This variant of xpending uses the extended form. + * + * @param key the key + * @param group the group + * @param range the range + * @param count the number of message to include + * @return A {@link io.smallrye.mutiny.Uni} emitting the list of pending messages + */ + Uni> xpending(K key, String group, StreamRange range, int count); + + /** + * Execute the command XPENDING. + * Summary: The XPENDING command is the interface to inspect the list of pending messages, and is as thus a very + * important command in order to observe and understand what is happening with a streams consumer groups: what + * clients are active, what messages are pending to be consumed, or to see if there are idle messages. + *

+ * Group: stream + * Requires Redis 5.0.0+ + *

+ * This variant of xpending uses the extended form. + * + * If the extra parameter include the name of the consumer, the produced list will only contain 0 or 1 item. + * + * @param key the key + * @param group the group + * @param range the range + * @param count the number of message to include + * @param args the extra argument (idle and consumer) + * @return A {@link io.smallrye.mutiny.Uni} emitting the list of pending messages + */ + Uni> xpending(K key, String group, StreamRange range, int count, XPendingArgs args); + } diff --git a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/stream/ReactiveTransactionalStreamCommands.java b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/stream/ReactiveTransactionalStreamCommands.java index 75c5f1d41419b..a15e7548c0d7e 100644 --- a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/stream/ReactiveTransactionalStreamCommands.java +++ b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/stream/ReactiveTransactionalStreamCommands.java @@ -1,7 +1,6 @@ package io.quarkus.redis.datasource.stream; import java.time.Duration; -import java.util.List; import java.util.Map; import io.quarkus.redis.datasource.ReactiveTransactionalRedisCommands; @@ -614,4 +613,62 @@ public interface ReactiveTransactionalStreamCommands extends ReactiveTr * otherwise. In the case of failure, the transaction is discarded. */ Uni xtrim(K key, XTrimArgs args); + + /** + * Execute the command XPENDING. + * Summary: The XPENDING command is the interface to inspect the list of pending messages, and is as thus a very + * important command in order to observe and understand what is happening with a streams consumer groups: what + * clients are active, what messages are pending to be consumed, or to see if there are idle messages. + *

+ * Group: stream + * Requires Redis 5.0.0+ + *

+ * This variant of xpending uses the summary form. + * + * @param key the key + * @param group the group + * @return A {@link io.smallrye.mutiny.Uni} emitting the xpending summary + */ + Uni xpending(K key, String group); + + /** + * Execute the command XPENDING. + * Summary: The XPENDING command is the interface to inspect the list of pending messages, and is as thus a very + * important command in order to observe and understand what is happening with a streams consumer groups: what + * clients are active, what messages are pending to be consumed, or to see if there are idle messages. + *

+ * Group: stream + * Requires Redis 5.0.0+ + *

+ * This variant of xpending uses the extended form. + * + * @param key the key + * @param group the group + * @param range the range + * @param count the number of message to include + * @return A {@link io.smallrye.mutiny.Uni} emitting the list of pending messages + */ + Uni xpending(K key, String group, StreamRange range, int count); + + /** + * Execute the command XPENDING. + * Summary: The XPENDING command is the interface to inspect the list of pending messages, and is as thus a very + * important command in order to observe and understand what is happening with a streams consumer groups: what + * clients are active, what messages are pending to be consumed, or to see if there are idle messages. + *

+ * Group: stream + * Requires Redis 5.0.0+ + *

+ * This variant of xpending uses the extended form. + *

+ * If the extra parameter include the name of the consumer, the produced list will only contain 0 or 1 item. + * + * @param key the key + * @param group the group + * @param range the range + * @param count the number of message to include + * @param args the extra argument (idle and consumer) + * @return A {@link io.smallrye.mutiny.Uni} emitting the list of pending messages + */ + Uni xpending(K key, String group, StreamRange range, int count, XPendingArgs args); } diff --git a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/stream/StreamCommands.java b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/stream/StreamCommands.java index 4b5792649c146..f195e7fbd7665 100644 --- a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/stream/StreamCommands.java +++ b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/stream/StreamCommands.java @@ -582,4 +582,62 @@ List> xreadgroup(String group, String consumer, MapXPENDING. + * Summary: The XPENDING command is the interface to inspect the list of pending messages, and is as thus a very + * important command in order to observe and understand what is happening with a streams consumer groups: what + * clients are active, what messages are pending to be consumed, or to see if there are idle messages. + *

+ * Group: stream + * Requires Redis 5.0.0+ + *

+ * This variant of xpending uses the summary form. + * + * @param key the key + * @param group the group + * @return the xpending summary + */ + XPendingSummary xpending(K key, String group); + + /** + * Execute the command XPENDING. + * Summary: The XPENDING command is the interface to inspect the list of pending messages, and is as thus a very + * important command in order to observe and understand what is happening with a streams consumer groups: what + * clients are active, what messages are pending to be consumed, or to see if there are idle messages. + *

+ * Group: stream + * Requires Redis 5.0.0+ + *

+ * This variant of xpending uses the extended form. + * + * @param key the key + * @param group the group + * @param range the range + * @param count the number of message to include + * @return the list of pending messages + */ + List xpending(K key, String group, StreamRange range, int count); + + /** + * Execute the command XPENDING. + * Summary: The XPENDING command is the interface to inspect the list of pending messages, and is as thus a very + * important command in order to observe and understand what is happening with a streams consumer groups: what + * clients are active, what messages are pending to be consumed, or to see if there are idle messages. + *

+ * Group: stream + * Requires Redis 5.0.0+ + *

+ * This variant of xpending uses the extended form. + * + * If the extra parameter include the name of the consumer, the produced list will only contain 0 or 1 item. + * + * @param key the key + * @param group the group + * @param range the range + * @param count the number of message to include + * @param args the extra argument (idle and consumer) + * @return the list of pending messages + */ + List xpending(K key, String group, StreamRange range, int count, XPendingArgs args); } diff --git a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/stream/TransactionalStreamCommands.java b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/stream/TransactionalStreamCommands.java index a72b9f9a66074..3fa742ce92074 100644 --- a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/stream/TransactionalStreamCommands.java +++ b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/stream/TransactionalStreamCommands.java @@ -1,7 +1,6 @@ package io.quarkus.redis.datasource.stream; import java.time.Duration; -import java.util.List; import java.util.Map; import io.quarkus.redis.datasource.TransactionalRedisCommands; @@ -551,4 +550,59 @@ public interface TransactionalStreamCommands extends TransactionalRedis * @param args the extra parameters */ void xtrim(K key, XTrimArgs args); + + /** + * Execute the command XPENDING. + * Summary: The XPENDING command is the interface to inspect the list of pending messages, and is as thus a very + * important command in order to observe and understand what is happening with a streams consumer groups: what + * clients are active, what messages are pending to be consumed, or to see if there are idle messages. + *

+ * Group: stream + * Requires Redis 5.0.0+ + *

+ * This variant of xpending uses the summary form. + * + * @param key the key + * @param group the group + */ + void xpending(K key, String group); + + /** + * Execute the command XPENDING. + * Summary: The XPENDING command is the interface to inspect the list of pending messages, and is as thus a very + * important command in order to observe and understand what is happening with a streams consumer groups: what + * clients are active, what messages are pending to be consumed, or to see if there are idle messages. + *

+ * Group: stream + * Requires Redis 5.0.0+ + *

+ * This variant of xpending uses the extended form. + * + * @param key the key + * @param group the group + * @param range the range + * @param count the number of message to include + */ + void xpending(K key, String group, StreamRange range, int count); + + /** + * Execute the command XPENDING. + * Summary: The XPENDING command is the interface to inspect the list of pending messages, and is as thus a very + * important command in order to observe and understand what is happening with a streams consumer groups: what + * clients are active, what messages are pending to be consumed, or to see if there are idle messages. + *

+ * Group: stream + * Requires Redis 5.0.0+ + *

+ * This variant of xpending uses the extended form. + *

+ * If the extra parameter include the name of the consumer, the produced list will only contain 0 or 1 item. + * + * @param key the key + * @param group the group + * @param range the range + * @param count the number of message to include + * @param args the extra argument (idle and consumer) + */ + void xpending(K key, String group, StreamRange range, int count, XPendingArgs args); } diff --git a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/stream/XPendingArgs.java b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/stream/XPendingArgs.java new file mode 100644 index 0000000000000..e5c1db2afadbf --- /dev/null +++ b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/stream/XPendingArgs.java @@ -0,0 +1,51 @@ +package io.quarkus.redis.datasource.stream; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; + +import io.quarkus.redis.datasource.RedisCommandExtraArguments; + +public class XPendingArgs implements RedisCommandExtraArguments { + + private String owner; + + private Duration idle; + + /** + * Sets the specific owner of the message + * + * @param owner the name of the consumer + * @return the current {@code XPendingArgs} + */ + public XPendingArgs consumer(String owner) { + this.owner = owner; + return this; + } + + /** + * Filters pending stream entries by their idle-time. + * + * @param idle the duration + * @return the current {@code XPendingArgs} + */ + public XPendingArgs idle(Duration idle) { + this.idle = idle; + return this; + } + + public Duration idle() { + return idle; + } + + @Override + public List toArgs() { + List args = new ArrayList<>(); + + if (owner != null) { + args.add(owner); + } + + return args; + } +} diff --git a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/stream/XPendingSummary.java b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/stream/XPendingSummary.java new file mode 100644 index 0000000000000..ee61127ca9751 --- /dev/null +++ b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/stream/XPendingSummary.java @@ -0,0 +1,64 @@ +package io.quarkus.redis.datasource.stream; + +import java.util.Map; + +/** + * The result of the xpending command when using the summary form. + *

+ * When using the summary form of xpending, the command outputs the total number of pending messages for this consumer + * group, followed by the smallest and greatest ID among the pending messages, and then list every consumer in the + * consumer group with at least one pending message, and the number of pending messages it has. + */ +public class XPendingSummary { + + private final long pendingCount; + + private final String lowestId; + private final String highestId; + + private final Map consumers; + + public XPendingSummary(long pendingCount, String lowestId, String highestId, Map consumers) { + this.pendingCount = pendingCount; + this.lowestId = lowestId; + this.highestId = highestId; + this.consumers = consumers; + } + + /** + * Gets the number of message waiting for acknowledgement + * + * @return the number of message not yet acknowledged + */ + public long getPendingCount() { + return pendingCount; + } + + /** + * Gets the lowest message id that was not yet acknowledged. + * + * @return the lowest message id + */ + public String getLowestId() { + return lowestId; + } + + /** + * Gets the highest message id that was not yet acknowledged. + * + * @return the highest message id + */ + public String getHighestId() { + return highestId; + } + + /** + * Get the list of every consumer in the consumer group with at least one pending message, + * and the number of pending messages it has. + * + * @return the map composed of consumer -> number of message + */ + public Map getConsumers() { + return consumers; + } +} diff --git a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/AbstractStreamCommands.java b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/AbstractStreamCommands.java index 18083a00de470..cd2a788c5d3e1 100644 --- a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/AbstractStreamCommands.java +++ b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/AbstractStreamCommands.java @@ -16,6 +16,7 @@ import io.quarkus.redis.datasource.stream.XClaimArgs; import io.quarkus.redis.datasource.stream.XGroupCreateArgs; import io.quarkus.redis.datasource.stream.XGroupSetIdArgs; +import io.quarkus.redis.datasource.stream.XPendingArgs; import io.quarkus.redis.datasource.stream.XReadArgs; import io.quarkus.redis.datasource.stream.XReadGroupArgs; import io.quarkus.redis.datasource.stream.XTrimArgs; @@ -427,4 +428,40 @@ Uni _xtrim(K key, XTrimArgs args) { return execute(cmd); } + + Uni _xpending(K key, String group) { + nonNull(key, "key"); + nonNull(key, "group"); + + RedisCommand cmd = RedisCommand.of(Command.XPENDING) + .put(marshaller.encode(key)) + .put(group); + return execute(cmd); + } + + Uni _xpending(K key, String group, StreamRange range, int count, XPendingArgs args) { + nonNull(key, "key"); + nonNull(key, "group"); + nonNull(range, "range"); + positive(count, "count"); + + RedisCommand cmd = RedisCommand.of(Command.XPENDING) + .put(marshaller.encode(key)) + .put(group); + + // IDLE must be before the range and count + if (args != null && args.idle() != null) { + cmd.put("IDLE"); + cmd.put(args.idle().toMillis()); + } + + cmd.putArgs(range) + .put(count); + + if (args != null) { + cmd.putArgs(args); + } + + return execute(cmd); + } } diff --git a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/BlockingStreamCommandsImpl.java b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/BlockingStreamCommandsImpl.java index da8f62c8048cb..589e123bb8067 100644 --- a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/BlockingStreamCommandsImpl.java +++ b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/BlockingStreamCommandsImpl.java @@ -6,6 +6,7 @@ import io.quarkus.redis.datasource.RedisDataSource; import io.quarkus.redis.datasource.stream.ClaimedMessages; +import io.quarkus.redis.datasource.stream.PendingMessage; import io.quarkus.redis.datasource.stream.ReactiveStreamCommands; import io.quarkus.redis.datasource.stream.StreamCommands; import io.quarkus.redis.datasource.stream.StreamMessage; @@ -14,6 +15,8 @@ import io.quarkus.redis.datasource.stream.XClaimArgs; import io.quarkus.redis.datasource.stream.XGroupCreateArgs; import io.quarkus.redis.datasource.stream.XGroupSetIdArgs; +import io.quarkus.redis.datasource.stream.XPendingArgs; +import io.quarkus.redis.datasource.stream.XPendingSummary; import io.quarkus.redis.datasource.stream.XReadArgs; import io.quarkus.redis.datasource.stream.XReadGroupArgs; import io.quarkus.redis.datasource.stream.XTrimArgs; @@ -185,4 +188,19 @@ public long xtrim(K key, String threshold) { public long xtrim(K key, XTrimArgs args) { return reactive.xtrim(key, args).await().atMost(timeout); } + + @Override + public XPendingSummary xpending(K key, String group) { + return reactive.xpending(key, group).await().atMost(timeout); + } + + @Override + public List xpending(K key, String group, StreamRange range, int count) { + return reactive.xpending(key, group, range, count).await().atMost(timeout); + } + + @Override + public List xpending(K key, String group, StreamRange range, int count, XPendingArgs args) { + return reactive.xpending(key, group, range, count, args).await().atMost(timeout); + } } diff --git a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/BlockingTransactionalStreamCommandsImpl.java b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/BlockingTransactionalStreamCommandsImpl.java index 2c4854c930447..9d01299758e8f 100644 --- a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/BlockingTransactionalStreamCommandsImpl.java +++ b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/BlockingTransactionalStreamCommandsImpl.java @@ -10,6 +10,7 @@ import io.quarkus.redis.datasource.stream.XClaimArgs; import io.quarkus.redis.datasource.stream.XGroupCreateArgs; import io.quarkus.redis.datasource.stream.XGroupSetIdArgs; +import io.quarkus.redis.datasource.stream.XPendingArgs; import io.quarkus.redis.datasource.stream.XReadArgs; import io.quarkus.redis.datasource.stream.XReadGroupArgs; import io.quarkus.redis.datasource.stream.XTrimArgs; @@ -181,4 +182,19 @@ public void xtrim(K key, String threshold) { public void xtrim(K key, XTrimArgs args) { reactive.xtrim(key, args).await().atMost(timeout); } + + @Override + public void xpending(K key, String group) { + reactive.xpending(key, group).await().atMost(timeout); + } + + @Override + public void xpending(K key, String group, StreamRange range, int count) { + reactive.xpending(key, group, range, count).await().atMost(timeout); + } + + @Override + public void xpending(K key, String group, StreamRange range, int count, XPendingArgs args) { + reactive.xpending(key, group, range, count, args).await().atMost(timeout); + } } diff --git a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/ReactiveStreamCommandsImpl.java b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/ReactiveStreamCommandsImpl.java index ef190787cad42..cc0638a66ee6e 100644 --- a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/ReactiveStreamCommandsImpl.java +++ b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/ReactiveStreamCommandsImpl.java @@ -9,6 +9,7 @@ import io.quarkus.redis.datasource.ReactiveRedisCommands; import io.quarkus.redis.datasource.ReactiveRedisDataSource; import io.quarkus.redis.datasource.stream.ClaimedMessages; +import io.quarkus.redis.datasource.stream.PendingMessage; import io.quarkus.redis.datasource.stream.ReactiveStreamCommands; import io.quarkus.redis.datasource.stream.StreamMessage; import io.quarkus.redis.datasource.stream.StreamRange; @@ -16,6 +17,8 @@ import io.quarkus.redis.datasource.stream.XClaimArgs; import io.quarkus.redis.datasource.stream.XGroupCreateArgs; import io.quarkus.redis.datasource.stream.XGroupSetIdArgs; +import io.quarkus.redis.datasource.stream.XPendingArgs; +import io.quarkus.redis.datasource.stream.XPendingSummary; import io.quarkus.redis.datasource.stream.XReadArgs; import io.quarkus.redis.datasource.stream.XReadGroupArgs; import io.quarkus.redis.datasource.stream.XTrimArgs; @@ -327,4 +330,53 @@ public Uni xtrim(K key, XTrimArgs args) { return super._xtrim(key, args) .map(Response::toLong); } + + @Override + public Uni xpending(K key, String group) { + return super._xpending(key, group) + .map(this::decodeAsXPendingSummary); + } + + @Override + public Uni> xpending(K key, String group, StreamRange range, int count) { + return xpending(key, group, range, count, null); + } + + @Override + public Uni> xpending(K key, String group, StreamRange range, int count, XPendingArgs args) { + return super._xpending(key, group, range, count, args) + .map(this::decodeListOfPendingMessages); + } + + protected List decodeListOfPendingMessages(Response r) { + if (r == null) { + return List.of(); + } + List list = new ArrayList<>(); + for (Response response : r) { + var id = response.get(0).toString(); + var name = response.get(1).toString(); + var dur = Duration.ofMillis(response.get(2).toLong()); + var count = response.get(3).toInteger(); + list.add(new PendingMessage(id, name, dur, count)); + } + return list; + } + + protected XPendingSummary decodeAsXPendingSummary(Response r) { + if (r == null) { + return null; + } + + var pending = r.get(0).toLong(); + var lowest = r.get(1).toString(); + var highest = r.get(2).toString(); + + Map consumers = new HashMap<>(); + for (Response nested : r.get(3)) { + consumers.put(nested.get(0).toString(), nested.get(1).toLong()); + } + + return new XPendingSummary(pending, lowest, highest, consumers); + } } diff --git a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/ReactiveTransactionalStreamCommandsImpl.java b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/ReactiveTransactionalStreamCommandsImpl.java index 06d36b7ac1af2..993f86404f178 100644 --- a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/ReactiveTransactionalStreamCommandsImpl.java +++ b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/ReactiveTransactionalStreamCommandsImpl.java @@ -9,6 +9,7 @@ import io.quarkus.redis.datasource.stream.XClaimArgs; import io.quarkus.redis.datasource.stream.XGroupCreateArgs; import io.quarkus.redis.datasource.stream.XGroupSetIdArgs; +import io.quarkus.redis.datasource.stream.XPendingArgs; import io.quarkus.redis.datasource.stream.XReadArgs; import io.quarkus.redis.datasource.stream.XReadGroupArgs; import io.quarkus.redis.datasource.stream.XTrimArgs; @@ -219,4 +220,22 @@ public Uni xtrim(K key, XTrimArgs args) { this.tx.enqueue(Response::toLong); return this.reactive._xtrim(key, args).invoke(this::queuedOrDiscard).replaceWithVoid(); } + + @Override + public Uni xpending(K key, String group) { + this.tx.enqueue(reactive::decodeAsXPendingSummary); + return this.reactive._xpending(key, group).invoke(this::queuedOrDiscard).replaceWithVoid(); + } + + @Override + public Uni xpending(K key, String group, StreamRange range, int count) { + this.tx.enqueue(reactive::decodeListOfPendingMessages); + return this.reactive._xpending(key, group, range, count, null).invoke(this::queuedOrDiscard).replaceWithVoid(); + } + + @Override + public Uni xpending(K key, String group, StreamRange range, int count, XPendingArgs args) { + this.tx.enqueue(reactive::decodeListOfPendingMessages); + return this.reactive._xpending(key, group, range, count, args).invoke(this::queuedOrDiscard).replaceWithVoid(); + } } diff --git a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/StreamCommandsTest.java b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/StreamCommandsTest.java index d1b2c5ac2ba5e..c5fc3497f5142 100644 --- a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/StreamCommandsTest.java +++ b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/StreamCommandsTest.java @@ -11,11 +11,13 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import io.quarkus.redis.datasource.stream.PendingMessage; import io.quarkus.redis.datasource.stream.StreamCommands; import io.quarkus.redis.datasource.stream.StreamMessage; import io.quarkus.redis.datasource.stream.StreamRange; @@ -23,6 +25,8 @@ import io.quarkus.redis.datasource.stream.XClaimArgs; import io.quarkus.redis.datasource.stream.XGroupCreateArgs; import io.quarkus.redis.datasource.stream.XGroupSetIdArgs; +import io.quarkus.redis.datasource.stream.XPendingArgs; +import io.quarkus.redis.datasource.stream.XPendingSummary; import io.quarkus.redis.datasource.stream.XReadArgs; import io.quarkus.redis.datasource.stream.XReadGroupArgs; import io.quarkus.redis.datasource.stream.XTrimArgs; @@ -622,7 +626,145 @@ void xGroupSetIdWithArgs() { stream.xgroupSetId(key, "g1", ids.get(50), new XGroupSetIdArgs().entriesRead(1234)); assertThat(stream.xreadgroup("g1", "c2", key, ">")).hasSize(49); + } + + @Test + void xPendingSummaryTest() { + Map payload = Map.of("sensor-id", 1234, "temperature", 19); + for (int i = 0; i < 100; i++) { + stream.xadd(key, payload); + } + + stream.xgroupCreate(key, "my-group", "0-0"); + List> messages = stream.xreadgroup("my-group", "consumer-123", key, ">"); + assertThat(messages).hasSize(100); + + XPendingSummary summary = stream.xpending(key, "my-group"); + assertThat(summary.getPendingCount()).isEqualTo(100L); + assertThat(summary.getHighestId()).isNotNull(); + assertThat(summary.getLowestId()).isNotNull(); + assertThat(summary.getConsumers()).containsExactly(entry("consumer-123", 100L)); + } + + @Test + void xPendingSummaryTestWithTwoConsumers() { + Map payload = Map.of("sensor-id", 1234, "temperature", 19); + for (int i = 0; i < 100; i++) { + stream.xadd(key, payload); + } + + stream.xgroupCreate(key, "my-group", "0-0"); + List> m1 = stream.xreadgroup("my-group", "consumer-1", key, ">"); + List> m2 = stream.xreadgroup("my-group", "consumer-2", key, ">"); + assertThat(m1.size() + m2.size()).isEqualTo(100); + + XPendingSummary summary = stream.xpending(key, "my-group"); + assertThat(summary.getPendingCount()).isEqualTo(100L); + assertThat(summary.getHighestId()).isNotNull(); + assertThat(summary.getLowestId()).isNotNull(); + assertThat(summary.getConsumers()).containsOnlyKeys("consumer-1"); // The second didn't have the chance to poll + } + + @Test + void xPendingExtendedTest() { + Map payload = Map.of("sensor-id", 1234, "temperature", 19); + for (int i = 0; i < 100; i++) { + stream.xadd(key, payload); + } + + stream.xgroupCreate(key, "my-group", "0-0"); + List> messages = stream.xreadgroup("my-group", "consumer-123", key, ">"); + assertThat(messages).hasSize(100); + + List pending = stream.xpending(key, "my-group", StreamRange.of("-", "+"), 10); + assertThat(pending).hasSize(10); + assertThat(pending).allSatisfy(msg -> { + assertThat(msg.getMessageId()).isNotNull(); + assertThat(msg.getDeliveryCount()).isEqualTo(1); + assertThat(msg.getDurationSinceLastDelivery()).isNotNull(); + assertThat(msg.getConsumer()).isEqualTo("consumer-123"); + }); + } + + @Test + void xPendingExtendedWithConsumerTest() { + Map payload = Map.of("sensor-id", 1234, "temperature", 19); + for (int i = 0; i < 100; i++) { + stream.xadd(key, payload); + } + + stream.xgroupCreate(key, "my-group", "0-0"); + stream.xreadgroup("my-group", "consumer-123", key, ">"); + stream.xreadgroup("my-group", "consumer-456", key, ">"); + + List pending = stream.xpending(key, "my-group", StreamRange.of("-", "+"), 10, + new XPendingArgs().consumer("consumer-123")); + assertThat(pending).hasSize(10); + assertThat(pending).allSatisfy(msg -> { + assertThat(msg.getMessageId()).isNotNull(); + assertThat(msg.getDeliveryCount()).isEqualTo(1); + assertThat(msg.getDurationSinceLastDelivery()).isNotNull(); + assertThat(msg.getConsumer()).isEqualTo("consumer-123"); + }); + + pending = stream.xpending(key, "my-group", StreamRange.of("-", "+"), 10, new XPendingArgs().consumer("consumer-456")); + assertThat(pending).isEmpty(); + + pending = stream.xpending(key, "my-group", StreamRange.of("-", "+"), 10, + new XPendingArgs().consumer("consumer-missing")); + assertThat(pending).isEmpty(); + } + + @Test + void xPendingExtendedTestWithConsumerAndIdle() { + Map payload = Map.of("sensor-id", 1234, "temperature", 19); + for (int i = 0; i < 100; i++) { + stream.xadd(key, payload); + } + + stream.xgroupCreate(key, "my-group", "0-0"); + stream.xreadgroup("my-group", "consumer-123", key, ">"); + stream.xreadgroup("my-group", "consumer-456", key, ">"); + + AtomicReference> reference = new AtomicReference<>(); + await().untilAsserted(() -> { + List pending = stream.xpending(key, "my-group", StreamRange.of("-", "+"), 10, new XPendingArgs() + .idle(Duration.ofSeconds(1)) + .consumer("consumer-123")); + assertThat(pending).hasSize(10); + reference.set(pending); + }); + assertThat(reference.get()).allSatisfy(msg -> { + assertThat(msg.getMessageId()).isNotNull(); + assertThat(msg.getDeliveryCount()).isEqualTo(1); + assertThat(msg.getDurationSinceLastDelivery()).isNotNull(); + assertThat(msg.getConsumer()).isEqualTo("consumer-123"); + }); + } + + @Test + void xPendingExtendedTestWithIdle() { + Map payload = Map.of("sensor-id", 1234, "temperature", 19); + for (int i = 0; i < 100; i++) { + stream.xadd(key, payload); + } + stream.xgroupCreate(key, "my-group", "0-0"); + stream.xreadgroup("my-group", "consumer-123", key, ">"); + + AtomicReference> reference = new AtomicReference<>(); + await().untilAsserted(() -> { + List pending = stream.xpending(key, "my-group", StreamRange.of("-", "+"), 10, new XPendingArgs() + .idle(Duration.ofSeconds(1))); + assertThat(pending).hasSize(10); + reference.set(pending); + }); + assertThat(reference.get()).allSatisfy(msg -> { + assertThat(msg.getMessageId()).isNotNull(); + assertThat(msg.getDeliveryCount()).isEqualTo(1); + assertThat(msg.getDurationSinceLastDelivery()).isNotNull(); + assertThat(msg.getConsumer()).isEqualTo("consumer-123"); + }); } } diff --git a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/TransactionalStreamCommandsTest.java b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/TransactionalStreamCommandsTest.java index d02ad9187a699..279e1371b458a 100644 --- a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/TransactionalStreamCommandsTest.java +++ b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/TransactionalStreamCommandsTest.java @@ -5,14 +5,13 @@ import java.time.Duration; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import io.quarkus.redis.datasource.stream.StreamMessage; -import io.quarkus.redis.datasource.stream.TransactionalStreamCommands; -import io.quarkus.redis.datasource.stream.XAddArgs; +import io.quarkus.redis.datasource.stream.*; import io.quarkus.redis.datasource.transactions.TransactionResult; import io.quarkus.redis.runtime.datasource.BlockingRedisDataSourceImpl; import io.quarkus.redis.runtime.datasource.ReactiveRedisDataSourceImpl; @@ -49,15 +48,27 @@ public void streamBlocking() { stream.xread(key, "0"); // 3 -> 2 messages stream.xgroupCreate(key, "g1", "0"); stream.xreadgroup("g1", "c1", key, ">"); + stream.xpending(key, "g1"); + stream.xpending(key, "g1", StreamRange.of("-", "+"), 10, new XPendingArgs().consumer("c1")); }); - assertThat(result.size()).isEqualTo(5); + assertThat(result.size()).isEqualTo(7); assertThat(result.discarded()).isFalse(); assertThat((String) result.get(0)).isNotBlank(); assertThat((String) result.get(1)).isNotBlank(); + String id1 = result.get(0); + String id2 = result.get(1); + assertThat((List>) result.get(2)).hasSize(2); assertThat((List>) result.get(4)).hasSize(2); + + assertThat(((XPendingSummary) result.get(5)).getPendingCount()).isEqualTo(2); + List list = result.get(6); + + assertThat(((List) result.get(6))).hasSize(2); + List ids = list.stream().map(PendingMessage::getMessageId).collect(Collectors.toList()); + assertThat(ids).containsExactly(id1, id2); } @Test @@ -70,16 +81,28 @@ public void streamReactive() { .chain((x) -> stream.xadd(key, new XAddArgs().nomkstream(), payload)) .chain(x -> stream.xread(key, "0")) .chain(x -> stream.xgroupCreate(key, "g1", "0")) - .chain(x -> stream.xreadgroup("g1", "c1", key, ">")); + .chain(x -> stream.xreadgroup("g1", "c1", key, ">")) + .chain(x -> stream.xpending(key, "g1")) + .chain(x -> stream.xpending(key, "g1", StreamRange.of("-", "+"), 10)); }).await().indefinitely(); - assertThat(result.size()).isEqualTo(5); + assertThat(result.size()).isEqualTo(7); assertThat(result.discarded()).isFalse(); assertThat((String) result.get(0)).isNotBlank(); assertThat((String) result.get(1)).isNotBlank(); + String id1 = result.get(0); + String id2 = result.get(1); + assertThat((List>) result.get(2)).hasSize(2); assertThat((List>) result.get(4)).hasSize(2); + + assertThat(((XPendingSummary) result.get(5)).getPendingCount()).isEqualTo(2); + List list = result.get(6); + + assertThat(((List) result.get(6))).hasSize(2); + List ids = list.stream().map(PendingMessage::getMessageId).collect(Collectors.toList()); + assertThat(ids).containsExactly(id1, id2); } }