Skip to content

Commit

Permalink
Implement XPending redis command
Browse files Browse the repository at this point in the history
  • Loading branch information
cescoffier committed Jul 7, 2023
1 parent f92ac9a commit 41541cc
Show file tree
Hide file tree
Showing 14 changed files with 725 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package io.quarkus.redis.datasource.stream;

import java.time.Duration;

/**
* Represents the result of an expended xpending command.
* In the extended form we no longer see the summary information, instead there is detailed information for each message
* in the pending entries list. For each message four attributes are returned:
* <ul>
* <li>The ID of the message.</li>
* <li>The name of the consumer that fetched the message and has still to acknowledge it. We call it the current owner of the
* message.</li>
* <li>The number of milliseconds that elapsed since the last time this message was delivered to this consumer.</li>
* <li>The number of times this message was delivered.</li>
* </ul>
* <p>
* While this structure is mutable, it is highly recommended to use it as a read-only structure.
*/
public class PendingMessage {
private final String messageId;
private final String consumer;
private final Duration durationSinceLastDelivery;

private final int deliveryCount;

public PendingMessage(String messageId, String consumer, Duration durationSinceLastDelivery, int deliveryCount) {
this.messageId = messageId;
this.consumer = consumer;
this.durationSinceLastDelivery = durationSinceLastDelivery;
this.deliveryCount = deliveryCount;
}

/**
* Gets the message id.
*
* @return the message id;
*/
public String getMessageId() {
return messageId;
}

/**
* Gets the consumer name.
*
* @return the consumer name
*/
public String getConsumer() {
return consumer;
}

/**
* Gets the duration since the last delivery attempt.
*
* @return the duration
*/
public Duration getDurationSinceLastDelivery() {
return durationSinceLastDelivery;
}

/**
* Gets the number of delivery attempts.
*
* @return the number of attempts
*/
public int getDeliveryCount() {
return deliveryCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -590,4 +590,62 @@ Uni<List<StreamMessage<K, F, V>>> xreadgroup(String group, String consumer, Map<
*/
Uni<Long> xtrim(K key, XTrimArgs args);

/**
* Execute the command <a href="https://redis.io/commands/xpending">XPENDING</a>.
* Summary: The XPENDING command is the interface to inspect the list of pending messages, and is as thus a very
* important command in order to observe and understand what is happening with a streams consumer groups: what
* clients are active, what messages are pending to be consumed, or to see if there are idle messages.
* <p>
* Group: stream
* Requires Redis 5.0.0+
* <p>
* This variant of xpending uses the <em>summary</em> form.
*
* @param key the key
* @param group the group
* @return A {@link io.smallrye.mutiny.Uni} emitting the xpending summary
*/
Uni<XPendingSummary> xpending(K key, String group);

/**
* Execute the command <a href="https://redis.io/commands/xpending">XPENDING</a>.
* Summary: The XPENDING command is the interface to inspect the list of pending messages, and is as thus a very
* important command in order to observe and understand what is happening with a streams consumer groups: what
* clients are active, what messages are pending to be consumed, or to see if there are idle messages.
* <p>
* Group: stream
* Requires Redis 5.0.0+
* <p>
* This variant of xpending uses the <em>extended</em> form.
*
* @param key the key
* @param group the group
* @param range the range
* @param count the number of message to include
* @return A {@link io.smallrye.mutiny.Uni} emitting the list of pending messages
*/
Uni<List<PendingMessage>> xpending(K key, String group, StreamRange range, int count);

/**
* Execute the command <a href="https://redis.io/commands/xpending">XPENDING</a>.
* Summary: The XPENDING command is the interface to inspect the list of pending messages, and is as thus a very
* important command in order to observe and understand what is happening with a streams consumer groups: what
* clients are active, what messages are pending to be consumed, or to see if there are idle messages.
* <p>
* Group: stream
* Requires Redis 5.0.0+
* <p>
* This variant of xpending uses the <em>extended</em> form.
*
* If the extra parameter include the name of the consumer, the produced list will only contain 0 or 1 item.
*
* @param key the key
* @param group the group
* @param range the range
* @param count the number of message to include
* @param args the extra argument (idle and consumer)
* @return A {@link io.smallrye.mutiny.Uni} emitting the list of pending messages
*/
Uni<List<PendingMessage>> xpending(K key, String group, StreamRange range, int count, XPendingArgs args);

}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.quarkus.redis.datasource.stream;

import java.time.Duration;
import java.util.List;
import java.util.Map;

import io.quarkus.redis.datasource.ReactiveTransactionalRedisCommands;
Expand Down Expand Up @@ -614,4 +613,62 @@ public interface ReactiveTransactionalStreamCommands<K, F, V> extends ReactiveTr
* otherwise. In the case of failure, the transaction is discarded.
*/
Uni<Void> xtrim(K key, XTrimArgs args);

/**
* Execute the command <a href="https://redis.io/commands/xpending">XPENDING</a>.
* Summary: The XPENDING command is the interface to inspect the list of pending messages, and is as thus a very
* important command in order to observe and understand what is happening with a streams consumer groups: what
* clients are active, what messages are pending to be consumed, or to see if there are idle messages.
* <p>
* Group: stream
* Requires Redis 5.0.0+
* <p>
* This variant of xpending uses the <em>summary</em> form.
*
* @param key the key
* @param group the group
* @return A {@link io.smallrye.mutiny.Uni} emitting the xpending summary
*/
Uni<Void> xpending(K key, String group);

/**
* Execute the command <a href="https://redis.io/commands/xpending">XPENDING</a>.
* Summary: The XPENDING command is the interface to inspect the list of pending messages, and is as thus a very
* important command in order to observe and understand what is happening with a streams consumer groups: what
* clients are active, what messages are pending to be consumed, or to see if there are idle messages.
* <p>
* Group: stream
* Requires Redis 5.0.0+
* <p>
* This variant of xpending uses the <em>extended</em> form.
*
* @param key the key
* @param group the group
* @param range the range
* @param count the number of message to include
* @return A {@link io.smallrye.mutiny.Uni} emitting the list of pending messages
*/
Uni<Void> xpending(K key, String group, StreamRange range, int count);

/**
* Execute the command <a href="https://redis.io/commands/xpending">XPENDING</a>.
* Summary: The XPENDING command is the interface to inspect the list of pending messages, and is as thus a very
* important command in order to observe and understand what is happening with a streams consumer groups: what
* clients are active, what messages are pending to be consumed, or to see if there are idle messages.
* <p>
* Group: stream
* Requires Redis 5.0.0+
* <p>
* This variant of xpending uses the <em>extended</em> form.
* <p>
* If the extra parameter include the name of the consumer, the produced list will only contain 0 or 1 item.
*
* @param key the key
* @param group the group
* @param range the range
* @param count the number of message to include
* @param args the extra argument (idle and consumer)
* @return A {@link io.smallrye.mutiny.Uni} emitting the list of pending messages
*/
Uni<Void> xpending(K key, String group, StreamRange range, int count, XPendingArgs args);
}
Original file line number Diff line number Diff line change
Expand Up @@ -582,4 +582,62 @@ List<StreamMessage<K, F, V>> xreadgroup(String group, String consumer, Map<K, St
* @return A {@link io.smallrye.mutiny.Uni} emitting the number of entries deleted from the stream
*/
long xtrim(K key, XTrimArgs args);

/**
* Execute the command <a href="https://redis.io/commands/xpending">XPENDING</a>.
* Summary: The XPENDING command is the interface to inspect the list of pending messages, and is as thus a very
* important command in order to observe and understand what is happening with a streams consumer groups: what
* clients are active, what messages are pending to be consumed, or to see if there are idle messages.
* <p>
* Group: stream
* Requires Redis 5.0.0+
* <p>
* This variant of xpending uses the <em>summary</em> form.
*
* @param key the key
* @param group the group
* @return the xpending summary
*/
XPendingSummary xpending(K key, String group);

/**
* Execute the command <a href="https://redis.io/commands/xpending">XPENDING</a>.
* Summary: The XPENDING command is the interface to inspect the list of pending messages, and is as thus a very
* important command in order to observe and understand what is happening with a streams consumer groups: what
* clients are active, what messages are pending to be consumed, or to see if there are idle messages.
* <p>
* Group: stream
* Requires Redis 5.0.0+
* <p>
* This variant of xpending uses the <em>extended</em> form.
*
* @param key the key
* @param group the group
* @param range the range
* @param count the number of message to include
* @return the list of pending messages
*/
List<PendingMessage> xpending(K key, String group, StreamRange range, int count);

/**
* Execute the command <a href="https://redis.io/commands/xpending">XPENDING</a>.
* Summary: The XPENDING command is the interface to inspect the list of pending messages, and is as thus a very
* important command in order to observe and understand what is happening with a streams consumer groups: what
* clients are active, what messages are pending to be consumed, or to see if there are idle messages.
* <p>
* Group: stream
* Requires Redis 5.0.0+
* <p>
* This variant of xpending uses the <em>extended</em> form.
*
* If the extra parameter include the name of the consumer, the produced list will only contain 0 or 1 item.
*
* @param key the key
* @param group the group
* @param range the range
* @param count the number of message to include
* @param args the extra argument (idle and consumer)
* @return the list of pending messages
*/
List<PendingMessage> xpending(K key, String group, StreamRange range, int count, XPendingArgs args);
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.quarkus.redis.datasource.stream;

import java.time.Duration;
import java.util.List;
import java.util.Map;

import io.quarkus.redis.datasource.TransactionalRedisCommands;
Expand Down Expand Up @@ -551,4 +550,59 @@ public interface TransactionalStreamCommands<K, F, V> extends TransactionalRedis
* @param args the extra parameters
*/
void xtrim(K key, XTrimArgs args);

/**
* Execute the command <a href="https://redis.io/commands/xpending">XPENDING</a>.
* Summary: The XPENDING command is the interface to inspect the list of pending messages, and is as thus a very
* important command in order to observe and understand what is happening with a streams consumer groups: what
* clients are active, what messages are pending to be consumed, or to see if there are idle messages.
* <p>
* Group: stream
* Requires Redis 5.0.0+
* <p>
* This variant of xpending uses the <em>summary</em> form.
*
* @param key the key
* @param group the group
*/
void xpending(K key, String group);

/**
* Execute the command <a href="https://redis.io/commands/xpending">XPENDING</a>.
* Summary: The XPENDING command is the interface to inspect the list of pending messages, and is as thus a very
* important command in order to observe and understand what is happening with a streams consumer groups: what
* clients are active, what messages are pending to be consumed, or to see if there are idle messages.
* <p>
* Group: stream
* Requires Redis 5.0.0+
* <p>
* This variant of xpending uses the <em>extended</em> form.
*
* @param key the key
* @param group the group
* @param range the range
* @param count the number of message to include
*/
void xpending(K key, String group, StreamRange range, int count);

/**
* Execute the command <a href="https://redis.io/commands/xpending">XPENDING</a>.
* Summary: The XPENDING command is the interface to inspect the list of pending messages, and is as thus a very
* important command in order to observe and understand what is happening with a streams consumer groups: what
* clients are active, what messages are pending to be consumed, or to see if there are idle messages.
* <p>
* Group: stream
* Requires Redis 5.0.0+
* <p>
* This variant of xpending uses the <em>extended</em> form.
* <p>
* If the extra parameter include the name of the consumer, the produced list will only contain 0 or 1 item.
*
* @param key the key
* @param group the group
* @param range the range
* @param count the number of message to include
* @param args the extra argument (idle and consumer)
*/
void xpending(K key, String group, StreamRange range, int count, XPendingArgs args);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package io.quarkus.redis.datasource.stream;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;

import io.quarkus.redis.datasource.RedisCommandExtraArguments;

public class XPendingArgs implements RedisCommandExtraArguments {

private String owner;

private Duration idle;

/**
* Sets the specific owner of the message
*
* @param owner the name of the consumer
* @return the current {@code XPendingArgs}
*/
public XPendingArgs consumer(String owner) {
this.owner = owner;
return this;
}

/**
* Filters pending stream entries by their idle-time.
*
* @param idle the duration
* @return the current {@code XPendingArgs}
*/
public XPendingArgs idle(Duration idle) {
this.idle = idle;
return this;
}

public Duration idle() {
return idle;
}

@Override
public List<String> toArgs() {
List<String> args = new ArrayList<>();

if (owner != null) {
args.add(owner);
}

return args;
}
}
Loading

0 comments on commit 41541cc

Please sign in to comment.