diff --git a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/stream/PendingMessage.java b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/stream/PendingMessage.java
new file mode 100644
index 0000000000000..7492a4f92e61c
--- /dev/null
+++ b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/stream/PendingMessage.java
@@ -0,0 +1,68 @@
+package io.quarkus.redis.datasource.stream;
+
+import java.time.Duration;
+
+/**
+ * Represents the result of an expended xpending command.
+ * In the extended form we no longer see the summary information, instead there is detailed information for each message
+ * in the pending entries list. For each message four attributes are returned:
+ *
+ * - The ID of the message.
+ * - The name of the consumer that fetched the message and has still to acknowledge it. We call it the current owner of the
+ * message.
+ * - The number of milliseconds that elapsed since the last time this message was delivered to this consumer.
+ * - The number of times this message was delivered.
+ *
+ *
+ * While this structure is mutable, it is highly recommended to use it as a read-only structure.
+ */
+public class PendingMessage {
+ private final String messageId;
+ private final String consumer;
+ private final Duration durationSinceLastDelivery;
+
+ private final int deliveryCount;
+
+ public PendingMessage(String messageId, String consumer, Duration durationSinceLastDelivery, int deliveryCount) {
+ this.messageId = messageId;
+ this.consumer = consumer;
+ this.durationSinceLastDelivery = durationSinceLastDelivery;
+ this.deliveryCount = deliveryCount;
+ }
+
+ /**
+ * Gets the message id.
+ *
+ * @return the message id;
+ */
+ public String getMessageId() {
+ return messageId;
+ }
+
+ /**
+ * Gets the consumer name.
+ *
+ * @return the consumer name
+ */
+ public String getConsumer() {
+ return consumer;
+ }
+
+ /**
+ * Gets the duration since the last delivery attempt.
+ *
+ * @return the duration
+ */
+ public Duration getDurationSinceLastDelivery() {
+ return durationSinceLastDelivery;
+ }
+
+ /**
+ * Gets the number of delivery attempts.
+ *
+ * @return the number of attempts
+ */
+ public int getDeliveryCount() {
+ return deliveryCount;
+ }
+}
diff --git a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/stream/ReactiveStreamCommands.java b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/stream/ReactiveStreamCommands.java
index 20266c54fbd2d..2d27cd3ce4a5a 100644
--- a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/stream/ReactiveStreamCommands.java
+++ b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/stream/ReactiveStreamCommands.java
@@ -590,4 +590,62 @@ Uni>> xreadgroup(String group, String consumer, Map<
*/
Uni xtrim(K key, XTrimArgs args);
+ /**
+ * Execute the command XPENDING.
+ * Summary: The XPENDING command is the interface to inspect the list of pending messages, and is as thus a very
+ * important command in order to observe and understand what is happening with a streams consumer groups: what
+ * clients are active, what messages are pending to be consumed, or to see if there are idle messages.
+ *
+ * Group: stream
+ * Requires Redis 5.0.0+
+ *
+ * This variant of xpending uses the summary form.
+ *
+ * @param key the key
+ * @param group the group
+ * @return A {@link io.smallrye.mutiny.Uni} emitting the xpending summary
+ */
+ Uni xpending(K key, String group);
+
+ /**
+ * Execute the command XPENDING.
+ * Summary: The XPENDING command is the interface to inspect the list of pending messages, and is as thus a very
+ * important command in order to observe and understand what is happening with a streams consumer groups: what
+ * clients are active, what messages are pending to be consumed, or to see if there are idle messages.
+ *
+ * Group: stream
+ * Requires Redis 5.0.0+
+ *
+ * This variant of xpending uses the extended form.
+ *
+ * @param key the key
+ * @param group the group
+ * @param range the range
+ * @param count the number of message to include
+ * @return A {@link io.smallrye.mutiny.Uni} emitting the list of pending messages
+ */
+ Uni> xpending(K key, String group, StreamRange range, int count);
+
+ /**
+ * Execute the command XPENDING.
+ * Summary: The XPENDING command is the interface to inspect the list of pending messages, and is as thus a very
+ * important command in order to observe and understand what is happening with a streams consumer groups: what
+ * clients are active, what messages are pending to be consumed, or to see if there are idle messages.
+ *
+ * Group: stream
+ * Requires Redis 5.0.0+
+ *
+ * This variant of xpending uses the extended form.
+ *
+ * If the extra parameter include the name of the consumer, the produced list will only contain 0 or 1 item.
+ *
+ * @param key the key
+ * @param group the group
+ * @param range the range
+ * @param count the number of message to include
+ * @param args the extra argument (idle and consumer)
+ * @return A {@link io.smallrye.mutiny.Uni} emitting the list of pending messages
+ */
+ Uni> xpending(K key, String group, StreamRange range, int count, XPendingArgs args);
+
}
diff --git a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/stream/ReactiveTransactionalStreamCommands.java b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/stream/ReactiveTransactionalStreamCommands.java
index 75c5f1d41419b..a15e7548c0d7e 100644
--- a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/stream/ReactiveTransactionalStreamCommands.java
+++ b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/stream/ReactiveTransactionalStreamCommands.java
@@ -1,7 +1,6 @@
package io.quarkus.redis.datasource.stream;
import java.time.Duration;
-import java.util.List;
import java.util.Map;
import io.quarkus.redis.datasource.ReactiveTransactionalRedisCommands;
@@ -614,4 +613,62 @@ public interface ReactiveTransactionalStreamCommands extends ReactiveTr
* otherwise. In the case of failure, the transaction is discarded.
*/
Uni xtrim(K key, XTrimArgs args);
+
+ /**
+ * Execute the command XPENDING.
+ * Summary: The XPENDING command is the interface to inspect the list of pending messages, and is as thus a very
+ * important command in order to observe and understand what is happening with a streams consumer groups: what
+ * clients are active, what messages are pending to be consumed, or to see if there are idle messages.
+ *
+ * Group: stream
+ * Requires Redis 5.0.0+
+ *
+ * This variant of xpending uses the summary form.
+ *
+ * @param key the key
+ * @param group the group
+ * @return A {@link io.smallrye.mutiny.Uni} emitting the xpending summary
+ */
+ Uni xpending(K key, String group);
+
+ /**
+ * Execute the command XPENDING.
+ * Summary: The XPENDING command is the interface to inspect the list of pending messages, and is as thus a very
+ * important command in order to observe and understand what is happening with a streams consumer groups: what
+ * clients are active, what messages are pending to be consumed, or to see if there are idle messages.
+ *
+ * Group: stream
+ * Requires Redis 5.0.0+
+ *
+ * This variant of xpending uses the extended form.
+ *
+ * @param key the key
+ * @param group the group
+ * @param range the range
+ * @param count the number of message to include
+ * @return A {@link io.smallrye.mutiny.Uni} emitting the list of pending messages
+ */
+ Uni xpending(K key, String group, StreamRange range, int count);
+
+ /**
+ * Execute the command XPENDING.
+ * Summary: The XPENDING command is the interface to inspect the list of pending messages, and is as thus a very
+ * important command in order to observe and understand what is happening with a streams consumer groups: what
+ * clients are active, what messages are pending to be consumed, or to see if there are idle messages.
+ *
+ * Group: stream
+ * Requires Redis 5.0.0+
+ *
+ * This variant of xpending uses the extended form.
+ *
+ * If the extra parameter include the name of the consumer, the produced list will only contain 0 or 1 item.
+ *
+ * @param key the key
+ * @param group the group
+ * @param range the range
+ * @param count the number of message to include
+ * @param args the extra argument (idle and consumer)
+ * @return A {@link io.smallrye.mutiny.Uni} emitting the list of pending messages
+ */
+ Uni xpending(K key, String group, StreamRange range, int count, XPendingArgs args);
}
diff --git a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/stream/StreamCommands.java b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/stream/StreamCommands.java
index 4b5792649c146..f195e7fbd7665 100644
--- a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/stream/StreamCommands.java
+++ b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/stream/StreamCommands.java
@@ -582,4 +582,62 @@ List> xreadgroup(String group, String consumer, MapXPENDING.
+ * Summary: The XPENDING command is the interface to inspect the list of pending messages, and is as thus a very
+ * important command in order to observe and understand what is happening with a streams consumer groups: what
+ * clients are active, what messages are pending to be consumed, or to see if there are idle messages.
+ *
+ * Group: stream
+ * Requires Redis 5.0.0+
+ *
+ * This variant of xpending uses the summary form.
+ *
+ * @param key the key
+ * @param group the group
+ * @return the xpending summary
+ */
+ XPendingSummary xpending(K key, String group);
+
+ /**
+ * Execute the command XPENDING.
+ * Summary: The XPENDING command is the interface to inspect the list of pending messages, and is as thus a very
+ * important command in order to observe and understand what is happening with a streams consumer groups: what
+ * clients are active, what messages are pending to be consumed, or to see if there are idle messages.
+ *
+ * Group: stream
+ * Requires Redis 5.0.0+
+ *
+ * This variant of xpending uses the extended form.
+ *
+ * @param key the key
+ * @param group the group
+ * @param range the range
+ * @param count the number of message to include
+ * @return the list of pending messages
+ */
+ List xpending(K key, String group, StreamRange range, int count);
+
+ /**
+ * Execute the command XPENDING.
+ * Summary: The XPENDING command is the interface to inspect the list of pending messages, and is as thus a very
+ * important command in order to observe and understand what is happening with a streams consumer groups: what
+ * clients are active, what messages are pending to be consumed, or to see if there are idle messages.
+ *
+ * Group: stream
+ * Requires Redis 5.0.0+
+ *
+ * This variant of xpending uses the extended form.
+ *
+ * If the extra parameter include the name of the consumer, the produced list will only contain 0 or 1 item.
+ *
+ * @param key the key
+ * @param group the group
+ * @param range the range
+ * @param count the number of message to include
+ * @param args the extra argument (idle and consumer)
+ * @return the list of pending messages
+ */
+ List xpending(K key, String group, StreamRange range, int count, XPendingArgs args);
}
diff --git a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/stream/TransactionalStreamCommands.java b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/stream/TransactionalStreamCommands.java
index a72b9f9a66074..3fa742ce92074 100644
--- a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/stream/TransactionalStreamCommands.java
+++ b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/stream/TransactionalStreamCommands.java
@@ -1,7 +1,6 @@
package io.quarkus.redis.datasource.stream;
import java.time.Duration;
-import java.util.List;
import java.util.Map;
import io.quarkus.redis.datasource.TransactionalRedisCommands;
@@ -551,4 +550,59 @@ public interface TransactionalStreamCommands extends TransactionalRedis
* @param args the extra parameters
*/
void xtrim(K key, XTrimArgs args);
+
+ /**
+ * Execute the command XPENDING.
+ * Summary: The XPENDING command is the interface to inspect the list of pending messages, and is as thus a very
+ * important command in order to observe and understand what is happening with a streams consumer groups: what
+ * clients are active, what messages are pending to be consumed, or to see if there are idle messages.
+ *
+ * Group: stream
+ * Requires Redis 5.0.0+
+ *
+ * This variant of xpending uses the summary form.
+ *
+ * @param key the key
+ * @param group the group
+ */
+ void xpending(K key, String group);
+
+ /**
+ * Execute the command XPENDING.
+ * Summary: The XPENDING command is the interface to inspect the list of pending messages, and is as thus a very
+ * important command in order to observe and understand what is happening with a streams consumer groups: what
+ * clients are active, what messages are pending to be consumed, or to see if there are idle messages.
+ *
+ * Group: stream
+ * Requires Redis 5.0.0+
+ *
+ * This variant of xpending uses the extended form.
+ *
+ * @param key the key
+ * @param group the group
+ * @param range the range
+ * @param count the number of message to include
+ */
+ void xpending(K key, String group, StreamRange range, int count);
+
+ /**
+ * Execute the command XPENDING.
+ * Summary: The XPENDING command is the interface to inspect the list of pending messages, and is as thus a very
+ * important command in order to observe and understand what is happening with a streams consumer groups: what
+ * clients are active, what messages are pending to be consumed, or to see if there are idle messages.
+ *
+ * Group: stream
+ * Requires Redis 5.0.0+
+ *
+ * This variant of xpending uses the extended form.
+ *
+ * If the extra parameter include the name of the consumer, the produced list will only contain 0 or 1 item.
+ *
+ * @param key the key
+ * @param group the group
+ * @param range the range
+ * @param count the number of message to include
+ * @param args the extra argument (idle and consumer)
+ */
+ void xpending(K key, String group, StreamRange range, int count, XPendingArgs args);
}
diff --git a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/stream/XPendingArgs.java b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/stream/XPendingArgs.java
new file mode 100644
index 0000000000000..e5c1db2afadbf
--- /dev/null
+++ b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/stream/XPendingArgs.java
@@ -0,0 +1,51 @@
+package io.quarkus.redis.datasource.stream;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+
+import io.quarkus.redis.datasource.RedisCommandExtraArguments;
+
+public class XPendingArgs implements RedisCommandExtraArguments {
+
+ private String owner;
+
+ private Duration idle;
+
+ /**
+ * Sets the specific owner of the message
+ *
+ * @param owner the name of the consumer
+ * @return the current {@code XPendingArgs}
+ */
+ public XPendingArgs consumer(String owner) {
+ this.owner = owner;
+ return this;
+ }
+
+ /**
+ * Filters pending stream entries by their idle-time.
+ *
+ * @param idle the duration
+ * @return the current {@code XPendingArgs}
+ */
+ public XPendingArgs idle(Duration idle) {
+ this.idle = idle;
+ return this;
+ }
+
+ public Duration idle() {
+ return idle;
+ }
+
+ @Override
+ public List toArgs() {
+ List args = new ArrayList<>();
+
+ if (owner != null) {
+ args.add(owner);
+ }
+
+ return args;
+ }
+}
diff --git a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/stream/XPendingSummary.java b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/stream/XPendingSummary.java
new file mode 100644
index 0000000000000..ee61127ca9751
--- /dev/null
+++ b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/stream/XPendingSummary.java
@@ -0,0 +1,64 @@
+package io.quarkus.redis.datasource.stream;
+
+import java.util.Map;
+
+/**
+ * The result of the xpending command when using the summary form.
+ *
+ * When using the summary form of xpending, the command outputs the total number of pending messages for this consumer
+ * group, followed by the smallest and greatest ID among the pending messages, and then list every consumer in the
+ * consumer group with at least one pending message, and the number of pending messages it has.
+ */
+public class XPendingSummary {
+
+ private final long pendingCount;
+
+ private final String lowestId;
+ private final String highestId;
+
+ private final Map consumers;
+
+ public XPendingSummary(long pendingCount, String lowestId, String highestId, Map consumers) {
+ this.pendingCount = pendingCount;
+ this.lowestId = lowestId;
+ this.highestId = highestId;
+ this.consumers = consumers;
+ }
+
+ /**
+ * Gets the number of message waiting for acknowledgement
+ *
+ * @return the number of message not yet acknowledged
+ */
+ public long getPendingCount() {
+ return pendingCount;
+ }
+
+ /**
+ * Gets the lowest message id that was not yet acknowledged.
+ *
+ * @return the lowest message id
+ */
+ public String getLowestId() {
+ return lowestId;
+ }
+
+ /**
+ * Gets the highest message id that was not yet acknowledged.
+ *
+ * @return the highest message id
+ */
+ public String getHighestId() {
+ return highestId;
+ }
+
+ /**
+ * Get the list of every consumer in the consumer group with at least one pending message,
+ * and the number of pending messages it has.
+ *
+ * @return the map composed of consumer -> number of message
+ */
+ public Map getConsumers() {
+ return consumers;
+ }
+}
diff --git a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/AbstractStreamCommands.java b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/AbstractStreamCommands.java
index 18083a00de470..cd2a788c5d3e1 100644
--- a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/AbstractStreamCommands.java
+++ b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/AbstractStreamCommands.java
@@ -16,6 +16,7 @@
import io.quarkus.redis.datasource.stream.XClaimArgs;
import io.quarkus.redis.datasource.stream.XGroupCreateArgs;
import io.quarkus.redis.datasource.stream.XGroupSetIdArgs;
+import io.quarkus.redis.datasource.stream.XPendingArgs;
import io.quarkus.redis.datasource.stream.XReadArgs;
import io.quarkus.redis.datasource.stream.XReadGroupArgs;
import io.quarkus.redis.datasource.stream.XTrimArgs;
@@ -427,4 +428,40 @@ Uni _xtrim(K key, XTrimArgs args) {
return execute(cmd);
}
+
+ Uni _xpending(K key, String group) {
+ nonNull(key, "key");
+ nonNull(key, "group");
+
+ RedisCommand cmd = RedisCommand.of(Command.XPENDING)
+ .put(marshaller.encode(key))
+ .put(group);
+ return execute(cmd);
+ }
+
+ Uni _xpending(K key, String group, StreamRange range, int count, XPendingArgs args) {
+ nonNull(key, "key");
+ nonNull(key, "group");
+ nonNull(range, "range");
+ positive(count, "count");
+
+ RedisCommand cmd = RedisCommand.of(Command.XPENDING)
+ .put(marshaller.encode(key))
+ .put(group);
+
+ // IDLE must be before the range and count
+ if (args != null && args.idle() != null) {
+ cmd.put("IDLE");
+ cmd.put(args.idle().toMillis());
+ }
+
+ cmd.putArgs(range)
+ .put(count);
+
+ if (args != null) {
+ cmd.putArgs(args);
+ }
+
+ return execute(cmd);
+ }
}
diff --git a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/BlockingStreamCommandsImpl.java b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/BlockingStreamCommandsImpl.java
index da8f62c8048cb..589e123bb8067 100644
--- a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/BlockingStreamCommandsImpl.java
+++ b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/BlockingStreamCommandsImpl.java
@@ -6,6 +6,7 @@
import io.quarkus.redis.datasource.RedisDataSource;
import io.quarkus.redis.datasource.stream.ClaimedMessages;
+import io.quarkus.redis.datasource.stream.PendingMessage;
import io.quarkus.redis.datasource.stream.ReactiveStreamCommands;
import io.quarkus.redis.datasource.stream.StreamCommands;
import io.quarkus.redis.datasource.stream.StreamMessage;
@@ -14,6 +15,8 @@
import io.quarkus.redis.datasource.stream.XClaimArgs;
import io.quarkus.redis.datasource.stream.XGroupCreateArgs;
import io.quarkus.redis.datasource.stream.XGroupSetIdArgs;
+import io.quarkus.redis.datasource.stream.XPendingArgs;
+import io.quarkus.redis.datasource.stream.XPendingSummary;
import io.quarkus.redis.datasource.stream.XReadArgs;
import io.quarkus.redis.datasource.stream.XReadGroupArgs;
import io.quarkus.redis.datasource.stream.XTrimArgs;
@@ -185,4 +188,19 @@ public long xtrim(K key, String threshold) {
public long xtrim(K key, XTrimArgs args) {
return reactive.xtrim(key, args).await().atMost(timeout);
}
+
+ @Override
+ public XPendingSummary xpending(K key, String group) {
+ return reactive.xpending(key, group).await().atMost(timeout);
+ }
+
+ @Override
+ public List xpending(K key, String group, StreamRange range, int count) {
+ return reactive.xpending(key, group, range, count).await().atMost(timeout);
+ }
+
+ @Override
+ public List xpending(K key, String group, StreamRange range, int count, XPendingArgs args) {
+ return reactive.xpending(key, group, range, count, args).await().atMost(timeout);
+ }
}
diff --git a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/BlockingTransactionalStreamCommandsImpl.java b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/BlockingTransactionalStreamCommandsImpl.java
index 2c4854c930447..9d01299758e8f 100644
--- a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/BlockingTransactionalStreamCommandsImpl.java
+++ b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/BlockingTransactionalStreamCommandsImpl.java
@@ -10,6 +10,7 @@
import io.quarkus.redis.datasource.stream.XClaimArgs;
import io.quarkus.redis.datasource.stream.XGroupCreateArgs;
import io.quarkus.redis.datasource.stream.XGroupSetIdArgs;
+import io.quarkus.redis.datasource.stream.XPendingArgs;
import io.quarkus.redis.datasource.stream.XReadArgs;
import io.quarkus.redis.datasource.stream.XReadGroupArgs;
import io.quarkus.redis.datasource.stream.XTrimArgs;
@@ -181,4 +182,19 @@ public void xtrim(K key, String threshold) {
public void xtrim(K key, XTrimArgs args) {
reactive.xtrim(key, args).await().atMost(timeout);
}
+
+ @Override
+ public void xpending(K key, String group) {
+ reactive.xpending(key, group).await().atMost(timeout);
+ }
+
+ @Override
+ public void xpending(K key, String group, StreamRange range, int count) {
+ reactive.xpending(key, group, range, count).await().atMost(timeout);
+ }
+
+ @Override
+ public void xpending(K key, String group, StreamRange range, int count, XPendingArgs args) {
+ reactive.xpending(key, group, range, count, args).await().atMost(timeout);
+ }
}
diff --git a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/ReactiveStreamCommandsImpl.java b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/ReactiveStreamCommandsImpl.java
index ef190787cad42..cc0638a66ee6e 100644
--- a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/ReactiveStreamCommandsImpl.java
+++ b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/ReactiveStreamCommandsImpl.java
@@ -9,6 +9,7 @@
import io.quarkus.redis.datasource.ReactiveRedisCommands;
import io.quarkus.redis.datasource.ReactiveRedisDataSource;
import io.quarkus.redis.datasource.stream.ClaimedMessages;
+import io.quarkus.redis.datasource.stream.PendingMessage;
import io.quarkus.redis.datasource.stream.ReactiveStreamCommands;
import io.quarkus.redis.datasource.stream.StreamMessage;
import io.quarkus.redis.datasource.stream.StreamRange;
@@ -16,6 +17,8 @@
import io.quarkus.redis.datasource.stream.XClaimArgs;
import io.quarkus.redis.datasource.stream.XGroupCreateArgs;
import io.quarkus.redis.datasource.stream.XGroupSetIdArgs;
+import io.quarkus.redis.datasource.stream.XPendingArgs;
+import io.quarkus.redis.datasource.stream.XPendingSummary;
import io.quarkus.redis.datasource.stream.XReadArgs;
import io.quarkus.redis.datasource.stream.XReadGroupArgs;
import io.quarkus.redis.datasource.stream.XTrimArgs;
@@ -327,4 +330,53 @@ public Uni xtrim(K key, XTrimArgs args) {
return super._xtrim(key, args)
.map(Response::toLong);
}
+
+ @Override
+ public Uni xpending(K key, String group) {
+ return super._xpending(key, group)
+ .map(this::decodeAsXPendingSummary);
+ }
+
+ @Override
+ public Uni> xpending(K key, String group, StreamRange range, int count) {
+ return xpending(key, group, range, count, null);
+ }
+
+ @Override
+ public Uni> xpending(K key, String group, StreamRange range, int count, XPendingArgs args) {
+ return super._xpending(key, group, range, count, args)
+ .map(this::decodeListOfPendingMessages);
+ }
+
+ protected List decodeListOfPendingMessages(Response r) {
+ if (r == null) {
+ return List.of();
+ }
+ List list = new ArrayList<>();
+ for (Response response : r) {
+ var id = response.get(0).toString();
+ var name = response.get(1).toString();
+ var dur = Duration.ofMillis(response.get(2).toLong());
+ var count = response.get(3).toInteger();
+ list.add(new PendingMessage(id, name, dur, count));
+ }
+ return list;
+ }
+
+ protected XPendingSummary decodeAsXPendingSummary(Response r) {
+ if (r == null) {
+ return null;
+ }
+
+ var pending = r.get(0).toLong();
+ var lowest = r.get(1).toString();
+ var highest = r.get(2).toString();
+
+ Map consumers = new HashMap<>();
+ for (Response nested : r.get(3)) {
+ consumers.put(nested.get(0).toString(), nested.get(1).toLong());
+ }
+
+ return new XPendingSummary(pending, lowest, highest, consumers);
+ }
}
diff --git a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/ReactiveTransactionalStreamCommandsImpl.java b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/ReactiveTransactionalStreamCommandsImpl.java
index 06d36b7ac1af2..be7e5a9ff85df 100644
--- a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/ReactiveTransactionalStreamCommandsImpl.java
+++ b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/ReactiveTransactionalStreamCommandsImpl.java
@@ -9,6 +9,7 @@
import io.quarkus.redis.datasource.stream.XClaimArgs;
import io.quarkus.redis.datasource.stream.XGroupCreateArgs;
import io.quarkus.redis.datasource.stream.XGroupSetIdArgs;
+import io.quarkus.redis.datasource.stream.XPendingArgs;
import io.quarkus.redis.datasource.stream.XReadArgs;
import io.quarkus.redis.datasource.stream.XReadGroupArgs;
import io.quarkus.redis.datasource.stream.XTrimArgs;
@@ -219,4 +220,22 @@ public Uni xtrim(K key, XTrimArgs args) {
this.tx.enqueue(Response::toLong);
return this.reactive._xtrim(key, args).invoke(this::queuedOrDiscard).replaceWithVoid();
}
+
+ @Override
+ public Uni xpending(K key, String group) {
+ this.tx.enqueue(reactive::decodeAsXPendingSummary);
+ return this.reactive._xpending(key, group).invoke(this::queuedOrDiscard).replaceWithVoid();
+ }
+
+ @Override
+ public Uni xpending(K key, String group, StreamRange range, int count) {
+ this.tx.enqueue(reactive::decodeListOfPendingMessages);
+ return this.reactive._xpending(key, group, range, count, null).invoke(this::queuedOrDiscard).replaceWithVoid();
+ }
+
+ @Override
+ public Uni xpending(K key, String group, StreamRange range, int count, XPendingArgs args) {
+ this.tx.enqueue(reactive::decodeAsXPendingSummary);
+ return this.reactive._xpending(key, group, range, count, args).invoke(this::queuedOrDiscard).replaceWithVoid();
+ }
}
diff --git a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/StreamCommandsTest.java b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/StreamCommandsTest.java
index d1b2c5ac2ba5e..c5fc3497f5142 100644
--- a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/StreamCommandsTest.java
+++ b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/StreamCommandsTest.java
@@ -11,11 +11,13 @@
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import io.quarkus.redis.datasource.stream.PendingMessage;
import io.quarkus.redis.datasource.stream.StreamCommands;
import io.quarkus.redis.datasource.stream.StreamMessage;
import io.quarkus.redis.datasource.stream.StreamRange;
@@ -23,6 +25,8 @@
import io.quarkus.redis.datasource.stream.XClaimArgs;
import io.quarkus.redis.datasource.stream.XGroupCreateArgs;
import io.quarkus.redis.datasource.stream.XGroupSetIdArgs;
+import io.quarkus.redis.datasource.stream.XPendingArgs;
+import io.quarkus.redis.datasource.stream.XPendingSummary;
import io.quarkus.redis.datasource.stream.XReadArgs;
import io.quarkus.redis.datasource.stream.XReadGroupArgs;
import io.quarkus.redis.datasource.stream.XTrimArgs;
@@ -622,7 +626,145 @@ void xGroupSetIdWithArgs() {
stream.xgroupSetId(key, "g1", ids.get(50), new XGroupSetIdArgs().entriesRead(1234));
assertThat(stream.xreadgroup("g1", "c2", key, ">")).hasSize(49);
+ }
+
+ @Test
+ void xPendingSummaryTest() {
+ Map payload = Map.of("sensor-id", 1234, "temperature", 19);
+ for (int i = 0; i < 100; i++) {
+ stream.xadd(key, payload);
+ }
+
+ stream.xgroupCreate(key, "my-group", "0-0");
+ List> messages = stream.xreadgroup("my-group", "consumer-123", key, ">");
+ assertThat(messages).hasSize(100);
+
+ XPendingSummary summary = stream.xpending(key, "my-group");
+ assertThat(summary.getPendingCount()).isEqualTo(100L);
+ assertThat(summary.getHighestId()).isNotNull();
+ assertThat(summary.getLowestId()).isNotNull();
+ assertThat(summary.getConsumers()).containsExactly(entry("consumer-123", 100L));
+ }
+
+ @Test
+ void xPendingSummaryTestWithTwoConsumers() {
+ Map payload = Map.of("sensor-id", 1234, "temperature", 19);
+ for (int i = 0; i < 100; i++) {
+ stream.xadd(key, payload);
+ }
+
+ stream.xgroupCreate(key, "my-group", "0-0");
+ List> m1 = stream.xreadgroup("my-group", "consumer-1", key, ">");
+ List> m2 = stream.xreadgroup("my-group", "consumer-2", key, ">");
+ assertThat(m1.size() + m2.size()).isEqualTo(100);
+
+ XPendingSummary summary = stream.xpending(key, "my-group");
+ assertThat(summary.getPendingCount()).isEqualTo(100L);
+ assertThat(summary.getHighestId()).isNotNull();
+ assertThat(summary.getLowestId()).isNotNull();
+ assertThat(summary.getConsumers()).containsOnlyKeys("consumer-1"); // The second didn't have the chance to poll
+ }
+
+ @Test
+ void xPendingExtendedTest() {
+ Map payload = Map.of("sensor-id", 1234, "temperature", 19);
+ for (int i = 0; i < 100; i++) {
+ stream.xadd(key, payload);
+ }
+
+ stream.xgroupCreate(key, "my-group", "0-0");
+ List> messages = stream.xreadgroup("my-group", "consumer-123", key, ">");
+ assertThat(messages).hasSize(100);
+
+ List pending = stream.xpending(key, "my-group", StreamRange.of("-", "+"), 10);
+ assertThat(pending).hasSize(10);
+ assertThat(pending).allSatisfy(msg -> {
+ assertThat(msg.getMessageId()).isNotNull();
+ assertThat(msg.getDeliveryCount()).isEqualTo(1);
+ assertThat(msg.getDurationSinceLastDelivery()).isNotNull();
+ assertThat(msg.getConsumer()).isEqualTo("consumer-123");
+ });
+ }
+
+ @Test
+ void xPendingExtendedWithConsumerTest() {
+ Map payload = Map.of("sensor-id", 1234, "temperature", 19);
+ for (int i = 0; i < 100; i++) {
+ stream.xadd(key, payload);
+ }
+
+ stream.xgroupCreate(key, "my-group", "0-0");
+ stream.xreadgroup("my-group", "consumer-123", key, ">");
+ stream.xreadgroup("my-group", "consumer-456", key, ">");
+
+ List pending = stream.xpending(key, "my-group", StreamRange.of("-", "+"), 10,
+ new XPendingArgs().consumer("consumer-123"));
+ assertThat(pending).hasSize(10);
+ assertThat(pending).allSatisfy(msg -> {
+ assertThat(msg.getMessageId()).isNotNull();
+ assertThat(msg.getDeliveryCount()).isEqualTo(1);
+ assertThat(msg.getDurationSinceLastDelivery()).isNotNull();
+ assertThat(msg.getConsumer()).isEqualTo("consumer-123");
+ });
+
+ pending = stream.xpending(key, "my-group", StreamRange.of("-", "+"), 10, new XPendingArgs().consumer("consumer-456"));
+ assertThat(pending).isEmpty();
+
+ pending = stream.xpending(key, "my-group", StreamRange.of("-", "+"), 10,
+ new XPendingArgs().consumer("consumer-missing"));
+ assertThat(pending).isEmpty();
+ }
+
+ @Test
+ void xPendingExtendedTestWithConsumerAndIdle() {
+ Map payload = Map.of("sensor-id", 1234, "temperature", 19);
+ for (int i = 0; i < 100; i++) {
+ stream.xadd(key, payload);
+ }
+
+ stream.xgroupCreate(key, "my-group", "0-0");
+ stream.xreadgroup("my-group", "consumer-123", key, ">");
+ stream.xreadgroup("my-group", "consumer-456", key, ">");
+
+ AtomicReference> reference = new AtomicReference<>();
+ await().untilAsserted(() -> {
+ List pending = stream.xpending(key, "my-group", StreamRange.of("-", "+"), 10, new XPendingArgs()
+ .idle(Duration.ofSeconds(1))
+ .consumer("consumer-123"));
+ assertThat(pending).hasSize(10);
+ reference.set(pending);
+ });
+ assertThat(reference.get()).allSatisfy(msg -> {
+ assertThat(msg.getMessageId()).isNotNull();
+ assertThat(msg.getDeliveryCount()).isEqualTo(1);
+ assertThat(msg.getDurationSinceLastDelivery()).isNotNull();
+ assertThat(msg.getConsumer()).isEqualTo("consumer-123");
+ });
+ }
+
+ @Test
+ void xPendingExtendedTestWithIdle() {
+ Map payload = Map.of("sensor-id", 1234, "temperature", 19);
+ for (int i = 0; i < 100; i++) {
+ stream.xadd(key, payload);
+ }
+ stream.xgroupCreate(key, "my-group", "0-0");
+ stream.xreadgroup("my-group", "consumer-123", key, ">");
+
+ AtomicReference> reference = new AtomicReference<>();
+ await().untilAsserted(() -> {
+ List pending = stream.xpending(key, "my-group", StreamRange.of("-", "+"), 10, new XPendingArgs()
+ .idle(Duration.ofSeconds(1)));
+ assertThat(pending).hasSize(10);
+ reference.set(pending);
+ });
+ assertThat(reference.get()).allSatisfy(msg -> {
+ assertThat(msg.getMessageId()).isNotNull();
+ assertThat(msg.getDeliveryCount()).isEqualTo(1);
+ assertThat(msg.getDurationSinceLastDelivery()).isNotNull();
+ assertThat(msg.getConsumer()).isEqualTo("consumer-123");
+ });
}
}
diff --git a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/TransactionalStreamCommandsTest.java b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/TransactionalStreamCommandsTest.java
index d02ad9187a699..b537b1c15779d 100644
--- a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/TransactionalStreamCommandsTest.java
+++ b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/TransactionalStreamCommandsTest.java
@@ -5,14 +5,18 @@
import java.time.Duration;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import io.quarkus.redis.datasource.stream.PendingMessage;
import io.quarkus.redis.datasource.stream.StreamMessage;
+import io.quarkus.redis.datasource.stream.StreamRange;
import io.quarkus.redis.datasource.stream.TransactionalStreamCommands;
import io.quarkus.redis.datasource.stream.XAddArgs;
+import io.quarkus.redis.datasource.stream.XPendingSummary;
import io.quarkus.redis.datasource.transactions.TransactionResult;
import io.quarkus.redis.runtime.datasource.BlockingRedisDataSourceImpl;
import io.quarkus.redis.runtime.datasource.ReactiveRedisDataSourceImpl;
@@ -49,15 +53,27 @@ public void streamBlocking() {
stream.xread(key, "0"); // 3 -> 2 messages
stream.xgroupCreate(key, "g1", "0");
stream.xreadgroup("g1", "c1", key, ">");
+ stream.xpending(key, "g1");
+ stream.xpending(key, "g1", StreamRange.of("-", "+"), 10);
});
- assertThat(result.size()).isEqualTo(5);
+ assertThat(result.size()).isEqualTo(7);
assertThat(result.discarded()).isFalse();
assertThat((String) result.get(0)).isNotBlank();
assertThat((String) result.get(1)).isNotBlank();
+ String id1 = result.get(0);
+ String id2 = result.get(1);
+
assertThat((List>) result.get(2)).hasSize(2);
assertThat((List>) result.get(4)).hasSize(2);
+
+ assertThat(((XPendingSummary) result.get(5)).getPendingCount()).isEqualTo(2);
+ List list = result.get(6);
+
+ assertThat(((List) result.get(6))).hasSize(2);
+ List ids = list.stream().map(PendingMessage::getMessageId).collect(Collectors.toList());
+ assertThat(ids).containsExactly(id1, id2);
}
@Test
@@ -70,16 +86,28 @@ public void streamReactive() {
.chain((x) -> stream.xadd(key, new XAddArgs().nomkstream(), payload))
.chain(x -> stream.xread(key, "0"))
.chain(x -> stream.xgroupCreate(key, "g1", "0"))
- .chain(x -> stream.xreadgroup("g1", "c1", key, ">"));
+ .chain(x -> stream.xreadgroup("g1", "c1", key, ">"))
+ .chain(x -> stream.xpending(key, "g1"))
+ .chain(x -> stream.xpending(key, "g1", StreamRange.of("-", "+"), 10));
}).await().indefinitely();
- assertThat(result.size()).isEqualTo(5);
+ assertThat(result.size()).isEqualTo(7);
assertThat(result.discarded()).isFalse();
assertThat((String) result.get(0)).isNotBlank();
assertThat((String) result.get(1)).isNotBlank();
+ String id1 = result.get(0);
+ String id2 = result.get(1);
+
assertThat((List>) result.get(2)).hasSize(2);
assertThat((List>) result.get(4)).hasSize(2);
+
+ assertThat(((XPendingSummary) result.get(5)).getPendingCount()).isEqualTo(2);
+ List list = result.get(6);
+
+ assertThat(((List) result.get(6))).hasSize(2);
+ List ids = list.stream().map(PendingMessage::getMessageId).collect(Collectors.toList());
+ assertThat(ids).containsExactly(id1, id2);
}
}