diff --git a/StackExchange.Redis.Tests/DatabaseWrapperTests.cs b/StackExchange.Redis.Tests/DatabaseWrapperTests.cs index 1e584073d..a388821d7 100644 --- a/StackExchange.Redis.Tests/DatabaseWrapperTests.cs +++ b/StackExchange.Redis.Tests/DatabaseWrapperTests.cs @@ -782,6 +782,145 @@ public void SortedSetScore() mock.Verify(_ => _.SortedSetScore("prefix:key", "member", CommandFlags.HighPriority)); } + [Fact] + public void StreamAcknowledge_1() + { + wrapper.StreamAcknowledge("key", "group", "0-0", CommandFlags.HighPriority); + mock.Verify(_ => _.StreamAcknowledge("prefix:key", "group", "0-0", CommandFlags.HighPriority)); + } + + [Fact] + public void StreamAcknowledge_2() + { + var messageIds = new RedisValue[] { "0-0", "0-1", "0-2" }; + wrapper.StreamAcknowledge("key", "group", messageIds, CommandFlags.HighPriority); + mock.Verify(_ => _.StreamAcknowledge("prefix:key", "group", messageIds, CommandFlags.HighPriority)); + } + + [Fact] + public void StreamAdd_1() + { + wrapper.StreamAdd("key", "field1", "value1", "*", 1000, true, CommandFlags.HighPriority); + mock.Verify(_ => _.StreamAdd("prefix:key", "field1", "value1", "*", 1000, true, CommandFlags.HighPriority)); + } + + [Fact] + public void StreamAdd_2() + { + var fields = new NameValueEntry[0]; + wrapper.StreamAdd("key", fields, "*", 1000, true, CommandFlags.HighPriority); + mock.Verify(_ => _.StreamAdd("prefix:key", fields, "*", 1000, true, CommandFlags.HighPriority)); + } + + [Fact] + public void StreamClaimMessages() + { + var messageIds = new RedisValue[0]; + wrapper.StreamClaim("key", "group", "consumer", 1000, messageIds, CommandFlags.HighPriority); + mock.Verify(_ => _.StreamClaim("prefix:key", "group", "consumer", 1000, messageIds, CommandFlags.HighPriority)); + } + + [Fact] + public void StreamClaimMessagesReturningIds() + { + var messageIds = new RedisValue[0]; + wrapper.StreamClaimIdsOnly("key", "group", "consumer", 1000, messageIds, CommandFlags.HighPriority); + mock.Verify(_ => _.StreamClaimIdsOnly("prefix:key", "group", "consumer", 1000, messageIds, CommandFlags.HighPriority)); + } + + [Fact] + public void StreamConsumerInfoGet() + { + wrapper.StreamConsumerInfo("key", "group", CommandFlags.HighPriority); + mock.Verify(_ => _.StreamConsumerInfo("prefix:key", "group", CommandFlags.HighPriority)); + } + + [Fact] + public void StreamCreateConsumerGroup() + { + wrapper.StreamCreateConsumerGroup("key", "group", "0-0", CommandFlags.HighPriority); + mock.Verify(_ => _.StreamCreateConsumerGroup("prefix:key", "group", "0-0", CommandFlags.HighPriority)); + } + + [Fact] + public void StreamGroupInfoGet() + { + wrapper.StreamGroupInfo("key", CommandFlags.HighPriority); + mock.Verify(_ => _.StreamGroupInfo("prefix:key", CommandFlags.HighPriority)); + } + + [Fact] + public void StreamInfoGet() + { + wrapper.StreamInfo("key", CommandFlags.HighPriority); + mock.Verify(_ => _.StreamInfo("prefix:key", CommandFlags.HighPriority)); + } + + [Fact] + public void StreamLength() + { + wrapper.StreamLength("key", CommandFlags.HighPriority); + mock.Verify(_ => _.StreamLength("prefix:key", CommandFlags.HighPriority)); + } + + [Fact] + public void StreamMessagesDelete() + { + var messageIds = new RedisValue[0] { }; + wrapper.StreamDelete("key", messageIds, CommandFlags.HighPriority); + mock.Verify(_ => _.StreamDelete("prefix:key", messageIds, CommandFlags.HighPriority)); + } + + [Fact] + public void StreamPendingInfoGet() + { + wrapper.StreamPending("key", "group", CommandFlags.HighPriority); + mock.Verify(_ => _.StreamPending("prefix:key", "group", CommandFlags.HighPriority)); + } + + [Fact] + public void StreamPendingMessageInfoGet() + { + wrapper.StreamPendingMessages("key", "group", 10, RedisValue.Null, null, null, CommandFlags.HighPriority); + mock.Verify(_ => _.StreamPendingMessages("prefix:key", "group", 10, RedisValue.Null, null, null, CommandFlags.HighPriority)); + } + + [Fact] + public void StreamRange() + { + wrapper.StreamRange("key", "-", "+", null, Order.Ascending, CommandFlags.HighPriority); + mock.Verify(_ => _.StreamRange("prefix:key", "-", "+",null, Order.Ascending, CommandFlags.HighPriority)); + } + + [Fact] + public void StreamRead_1() + { + var keysAndIds = new StreamIdPair[0] { }; + wrapper.StreamRead(keysAndIds, null, CommandFlags.HighPriority); + mock.Verify(_ => _.StreamRead(keysAndIds, null, CommandFlags.HighPriority)); + } + + [Fact] + public void StreamRead_2() + { + wrapper.StreamRead("key", "0-0", null, CommandFlags.HighPriority); + mock.Verify(_ => _.StreamRead("prefix:key", "0-0", null, CommandFlags.HighPriority)); + } + + [Fact] + public void StreamStreamReadGroup() + { + wrapper.StreamReadGroup("key", "group", "consumer", "0-0", 10, CommandFlags.HighPriority); + mock.Verify(_ => _.StreamReadGroup("prefix:key", "group", "consumer", "0-0", 10, CommandFlags.HighPriority)); + } + + [Fact] + public void StreamTrim() + { + wrapper.StreamTrim("key", 1000, true, CommandFlags.HighPriority); + mock.Verify(_ => _.StreamTrim("prefix:key", 1000, true, CommandFlags.HighPriority)); + } + [Fact] public void StringAppend() { diff --git a/StackExchange.Redis.Tests/Streams.cs b/StackExchange.Redis.Tests/Streams.cs new file mode 100644 index 000000000..f9dd10b25 --- /dev/null +++ b/StackExchange.Redis.Tests/Streams.cs @@ -0,0 +1,1304 @@ +using System; +using System.Linq; +using Xunit; +using Xunit.Abstractions; + +namespace StackExchange.Redis.Tests +{ + public class Streams : TestBase + { + public Streams(ITestOutputHelper output) : base(output) { } + + [Fact] + public void IsStreamType() + { + using (var conn = Create()) + { + var key = GetUniqueKey("type_check"); + + Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams); + + var db = conn.GetDatabase(); + db.StreamAdd(key, "field1", "value1"); + + var keyType = db.KeyType(key); + + Assert.Equal(RedisType.Stream, keyType); + } + } + + [Fact] + public void StreamAddSinglePairWithAutoId() + { + using (var conn = Create()) + { + Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams); + + var db = conn.GetDatabase(); + var messageId = db.StreamAdd(GetUniqueKey("auto_id"), "field1", "value1"); + + Assert.True(messageId != RedisValue.Null && ((string)messageId).Length > 0); + } + } + + [Fact] + public void StreamAddMultipleValuePairsWithAutoId() + { + using (var conn = Create()) + { + Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams); + + var key = GetUniqueKey("multiple_value_pairs"); + + var fields = new NameValueEntry[] + { + new NameValueEntry("field1", "value1"), + new NameValueEntry("field2", "value2") + }; + + var db = conn.GetDatabase(); + var messageId = db.StreamAdd(key, fields); + + var entries = db.StreamRange(key); + + Assert.True(entries.Length == 1); + Assert.Equal(messageId, entries[0].Id); + Assert.True(entries[0].Values.Length == 2); + Assert.True(entries[0].Values[0].Name == "field1" && + entries[0].Values[0].Value == "value1"); + Assert.True(entries[0].Values[1].Name == "field2" && + entries[0].Values[1].Value == "value2"); + } + } + + [Fact] + public void StreamAddWithManualId() + { + var id = "42-0"; + var key = GetUniqueKey("manual_id"); + + using (var conn = Create()) + { + Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams); + + var db = conn.GetDatabase(); + var messageId = db.StreamAdd(key, "field1", "value1", id); + + Assert.Equal(id, messageId); + } + } + + [Fact] + public void StreamAddMultipleValuePairsWithManualId() + { + var id = "42-0"; + var key = GetUniqueKey("manual_id_multiple_values"); + + using (var conn = Create()) + { + Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams); + + var db = conn.GetDatabase(); + + var fields = new NameValueEntry[] + { + new NameValueEntry("field1", "value1"), + new NameValueEntry("field2", "value2") + }; + + var messageId = db.StreamAdd(key, fields, id); + var entries = db.StreamRange(key); + + Assert.Equal(id, messageId); + Assert.NotNull(entries); + Assert.True(entries.Length == 1); + Assert.Equal(id, entries[0].Id); + } + } + + [Fact] + public void StreamConsumerGroupWithNoConsumers() + { + var key = GetUniqueKey("group_with_no_consumers"); + var groupName = "test_group"; + + using (var conn = Create()) + { + Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams); + + var db = conn.GetDatabase(); + + // Create a stream + db.StreamAdd(key, "field1", "value1"); + + // Create a group + db.StreamCreateConsumerGroup(key, groupName, "0-0"); + + // Query redis for the group consumers, expect an empty list in response. + var consumers = db.StreamConsumerInfo(key, groupName); + + Assert.True(consumers.Length == 0); + } + } + + [Fact] + public void StreamCreateConsumerGroup() + { + var key = GetUniqueKey("group_create"); + var groupName = "test_group"; + + using (var conn = Create()) + { + Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams); + + var db = conn.GetDatabase(); + + // Create a stream + db.StreamAdd(key, "field1", "value1"); + + // Create a group + var result = db.StreamCreateConsumerGroup(key, groupName, "-"); + + Assert.True(result); + } + } + + [Fact] + public void StreamConsumerGroupReadOnlyNewMessagesWithEmptyResponse() + { + var key = GetUniqueKey("group_read"); + var groupName = "test_group"; + + using (var conn = Create()) + { + Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams); + + var db = conn.GetDatabase(); + + // Create a stream + db.StreamAdd(key, "field1", "value1"); + db.StreamAdd(key, "field2", "value2"); + + // Create a group. + db.StreamCreateConsumerGroup(key, groupName); + + // Read, expect no messages + var entries = db.StreamReadGroup(key, groupName, "test_consumer", "0-0"); + + Assert.True(entries.Length == 0); + } + } + + [Fact] + public void StreamConsumerGroupReadFromStreamBeginning() + { + var key = GetUniqueKey("group_read_beginning"); + var groupName = "test_group"; + + using (var conn = Create()) + { + Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams); + + var db = conn.GetDatabase(); + + var id1 = db.StreamAdd(key, "field1", "value1"); + var id2 = db.StreamAdd(key, "field2", "value2"); + + db.StreamCreateConsumerGroup(key, groupName, "-"); + + var entries = db.StreamReadGroup(key, groupName, "test_consumer", "0-0"); + + Assert.True(entries.Length == 2); + Assert.True(id1 == entries[0].Id); + Assert.True(id2 == entries[1].Id); + } + } + + [Fact] + public void StreamConsumerGroupReadFromStreamBeginningWithCount() + { + var key = GetUniqueKey("group_read_with_count"); + var groupName = "test_group"; + + using (var conn = Create()) + { + Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams); + + var db = conn.GetDatabase(); + + var id1 = db.StreamAdd(key, "field1", "value1"); + var id2 = db.StreamAdd(key, "field2", "value2"); + var id3 = db.StreamAdd(key, "field3", "value3"); + var id4 = db.StreamAdd(key, "field4", "value4"); + + db.StreamCreateConsumerGroup(key, groupName, "-"); + + // Start reading after id1. + var entries = db.StreamReadGroup(key, groupName, "test_consumer", id1, 2); + + // Ensure we only received the requested count and that the IDs match the expected values. + Assert.True(entries.Length == 2); + Assert.True(id2 == entries[0].Id); + Assert.True(id3 == entries[1].Id); + } + } + + [Fact] + public void StreamConsumerGroupAcknowledgeMessage() + { + var key = GetUniqueKey("group_ack"); + var groupName = "test_group"; + var consumer = "test_consumer"; + + using (var conn = Create()) + { + Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams); + + var db = conn.GetDatabase(); + + var id1 = db.StreamAdd(key, "field1", "value1"); + var id2 = db.StreamAdd(key, "field2", "value2"); + var id3 = db.StreamAdd(key, "field3", "value3"); + var id4 = db.StreamAdd(key, "field4", "value4"); + + db.StreamCreateConsumerGroup(key, groupName, "-"); + + // Read all 4 messages, they will be assigned to the consumer + var entries = db.StreamReadGroup(key, groupName, consumer, "0-0"); + + // Send XACK for 3 of the messages + + // Single message Id overload. + var oneAck = db.StreamAcknowledge(key, groupName, id1); + + // Multiple message Id overload. + var twoAck = db.StreamAcknowledge(key, groupName, new RedisValue[] { id3, id4 }); + + // Read the group again, it should only return the unacknowledged message. + var notAcknowledged = db.StreamReadGroup(key, groupName, consumer, "0-0"); + + Assert.True(entries.Length == 4); + Assert.Equal(1, oneAck); + Assert.Equal(2, twoAck); + Assert.True(notAcknowledged.Length == 1); + Assert.Equal(id2, notAcknowledged[0].Id); + } + } + + [Fact] + public void StreamConsumerGroupClaimMessages() + { + var key = GetUniqueKey("group_claim"); + var groupName = "test_group"; + var consumer1 = "test_consumer_1"; + var consumer2 = "test_consumer_2"; + + using (var conn = Create()) + { + Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams); + + var db = conn.GetDatabase(); + + var id1 = db.StreamAdd(key, "field1", "value1"); + var id2 = db.StreamAdd(key, "field2", "value2"); + var id3 = db.StreamAdd(key, "field3", "value3"); + var id4 = db.StreamAdd(key, "field4", "value4"); + + db.StreamCreateConsumerGroup(key, groupName, "0-0"); + + // Read a single message into the first consumer. + db.StreamReadGroup(key, groupName, consumer1, count: 1); + + // Read the remaining messages into the second consumer. + db.StreamReadGroup(key, groupName, consumer2); + + // Claim the 3 messages consumed by consumer2 for consumer1. + + // Get the pending messages for consumer2. + var pendingMessages = db.StreamPendingMessages(key, groupName, + 10, + consumer2); + + // Claim the messages for consumer1. + var messages = db.StreamClaim(key, + groupName, + consumer1, + 0, // Min message idle time + messageIds: pendingMessages.Select(pm => pm.MessageId).ToArray()); + + // Now see how many messages are pending for each consumer + var pendingSummary = db.StreamPending(key, groupName); + + Assert.NotNull(pendingSummary.Consumers); + Assert.True(pendingSummary.Consumers.Length == 1); + Assert.Equal(4, pendingSummary.Consumers[0].PendingMessageCount); + Assert.True(pendingMessages.Length == messages.Length); + } + } + + [Fact] + public void StreamConsumerGroupClaimMessagesReturningIds() + { + var key = GetUniqueKey("group_claim_view_ids"); + var groupName = "test_group"; + var consumer1 = "test_consumer_1"; + var consumer2 = "test_consumer_2"; + + using (var conn = Create()) + { + Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams); + + var db = conn.GetDatabase(); + + var id1 = db.StreamAdd(key, "field1", "value1"); + var id2 = db.StreamAdd(key, "field2", "value2"); + var id3 = db.StreamAdd(key, "field3", "value3"); + var id4 = db.StreamAdd(key, "field4", "value4"); + + db.StreamCreateConsumerGroup(key, groupName, "-"); + + // Read a single message into the first consumer. + var consumer1Messages = db.StreamReadGroup(key, groupName, consumer1, "-", 1); + + // Read the remaining messages into the second consumer. + var consumer2Messages = db.StreamReadGroup(key, groupName, consumer2); + + // Claim the 3 messages consumed by consumer2 for consumer1. + + // Get the pending messages for consumer2. + var pendingMessages = db.StreamPendingMessages(key, groupName, + 10, + consumer2); + + // Claim the messages for consumer1. + var messageIds = db.StreamClaimIdsOnly(key, + groupName, + consumer1, + 0, // Min message idle time + messageIds: pendingMessages.Select(pm => pm.MessageId).ToArray()); + + // We should get an array of 3 message IDs. + Assert.Equal(3, messageIds.Length); + Assert.Equal(id2, messageIds[0]); + Assert.Equal(id3, messageIds[1]); + Assert.Equal(id4, messageIds[2]); + } + + } + + [Fact] + public void StreamConsumerGroupViewPendingInfoNoConsumers() + { + var key = GetUniqueKey("group_pending_info_no_consumers"); + var groupName = "test_group"; + + using (var conn = Create()) + { + Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams); + + var db = conn.GetDatabase(); + + var id1 = db.StreamAdd(key, "field1", "value1"); + + db.StreamCreateConsumerGroup(key, groupName, "-"); + + var pendingInfo = db.StreamPending(key, groupName); + + Assert.Equal(0, pendingInfo.PendingMessageCount); + Assert.True(pendingInfo.LowestPendingMessageId == RedisValue.Null); + Assert.True(pendingInfo.HighestPendingMessageId == RedisValue.Null); + Assert.NotNull(pendingInfo.Consumers); + Assert.True(pendingInfo.Consumers.Length == 0); + } + } + + [Fact] + public void StreamConsumerGroupViewPendingInfoWhenNothingPending() + { + var key = GetUniqueKey("group_pending_info_nothing_pending"); + var groupName = "test_group"; + + using (var conn = Create()) + { + Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams); + + var db = conn.GetDatabase(); + + var id1 = db.StreamAdd(key, "field1", "value1"); + + db.StreamCreateConsumerGroup(key, groupName, "0-0"); + + var pendingMessages = db.StreamPendingMessages(key, + groupName, + 10, + consumerName: RedisValue.Null); + + Assert.NotNull(pendingMessages); + Assert.True(pendingMessages.Length == 0); + } + } + + [Fact] + public void StreamConsumerGroupViewPendingInfoSummary() + { + var key = GetUniqueKey("group_pending_info"); + var groupName = "test_group"; + var consumer1 = "test_consumer_1"; + var consumer2 = "test_consumer_2"; + + using (var conn = Create()) + { + Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams); + + var db = conn.GetDatabase(); + + var id1 = db.StreamAdd(key, "field1", "value1"); + var id2 = db.StreamAdd(key, "field2", "value2"); + var id3 = db.StreamAdd(key, "field3", "value3"); + var id4 = db.StreamAdd(key, "field4", "value4"); + + db.StreamCreateConsumerGroup(key, groupName, "-"); + + // Read a single message into the first consumer. + var consumer1Messages = db.StreamReadGroup(key, groupName, consumer1, "-", 1); + + // Read the remaining messages into the second consumer. + var consumer2Messages = db.StreamReadGroup(key, groupName, consumer2); + + var pendingInfo = db.StreamPending(key, groupName); + + Assert.Equal(4, pendingInfo.PendingMessageCount); + Assert.Equal(id1, pendingInfo.LowestPendingMessageId); + Assert.Equal(id4, pendingInfo.HighestPendingMessageId); + Assert.True(pendingInfo.Consumers.Length == 2); + + var consumer1Count = pendingInfo.Consumers.First(c => c.Name == consumer1).PendingMessageCount; + var consumer2Count = pendingInfo.Consumers.First(c => c.Name == consumer2).PendingMessageCount; + + Assert.Equal(1, consumer1Count); + Assert.Equal(3, consumer2Count); + } + } + + [Fact] + public void StreamConsumerGroupViewPendingMessageInfo() + { + var key = GetUniqueKey("group_pending_messages"); + var groupName = "test_group"; + var consumer1 = "test_consumer_1"; + var consumer2 = "test_consumer_2"; + + using (var conn = Create()) + { + Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams); + + var db = conn.GetDatabase(); + + var id1 = db.StreamAdd(key, "field1", "value1"); + var id2 = db.StreamAdd(key, "field2", "value2"); + var id3 = db.StreamAdd(key, "field3", "value3"); + var id4 = db.StreamAdd(key, "field4", "value4"); + + db.StreamCreateConsumerGroup(key, groupName, "-"); + + // Read a single message into the first consumer. + var consumer1Messages = db.StreamReadGroup(key, groupName, consumer1, count: 1); + + // Read the remaining messages into the second consumer. + var consumer2Messages = db.StreamReadGroup(key, groupName, consumer2); + + // Get the pending info about the messages themselves. + var pendingMessageInfoList = db.StreamPendingMessages(key, groupName, 10, RedisValue.Null); + + Assert.NotNull(pendingMessageInfoList); + Assert.Equal(4, pendingMessageInfoList.Length); + Assert.Equal(consumer1, pendingMessageInfoList[0].ConsumerName); + Assert.Equal(1, pendingMessageInfoList[0].DeliveryCount); + Assert.True((int)pendingMessageInfoList[0].IdleTimeInMilliseconds > 0); + Assert.Equal(id1, pendingMessageInfoList[0].MessageId); + } + } + + [Fact] + public void StreamConsumerGroupViewPendingMessageInfoForConsumer() + { + var key = GetUniqueKey("group_pending_for_consumer"); + var groupName = "test_group"; + var consumer1 = "test_consumer_1"; + var consumer2 = "test_consumer_2"; + + using (var conn = Create()) + { + Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams); + + var db = conn.GetDatabase(); + + var id1 = db.StreamAdd(key, "field1", "value1"); + var id2 = db.StreamAdd(key, "field2", "value2"); + var id3 = db.StreamAdd(key, "field3", "value3"); + var id4 = db.StreamAdd(key, "field4", "value4"); + + db.StreamCreateConsumerGroup(key, groupName, "-"); + + // Read a single message into the first consumer. + var consumer1Messages = db.StreamReadGroup(key, groupName, consumer1, count: 1); + + // Read the remaining messages into the second consumer. + var consumer2Messages = db.StreamReadGroup(key, groupName, consumer2); + + // Get the pending info about the messages themselves. + var pendingMessageInfoList = db.StreamPendingMessages(key, + groupName, + 10, + consumer2); + + Assert.NotNull(pendingMessageInfoList); + Assert.Equal(3, pendingMessageInfoList.Length); + } + } + + [Fact] + public void StreamDeleteMessage() + { + var key = GetUniqueKey("delete_msg"); + + using (var conn = Create()) + { + Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams); + + var db = conn.GetDatabase(); + + var id1 = db.StreamAdd(key, "field1", "value1"); + var id2 = db.StreamAdd(key, "field2", "value2"); + var id3 = db.StreamAdd(key, "field3", "value3"); + var id4 = db.StreamAdd(key, "field4", "value4"); + + var deletedCount = db.StreamDelete(key, new RedisValue[] { id3 }); + var messages = db.StreamRange(key, "-", "+"); + + Assert.Equal(1, deletedCount); + Assert.Equal(3, messages.Length); + } + } + + [Fact] + public void StreamDeleteMessages() + { + var key = GetUniqueKey("delete_msgs"); + + using (var conn = Create()) + { + Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams); + + var db = conn.GetDatabase(); + + var id1 = db.StreamAdd(key, "field1", "value1"); + var id2 = db.StreamAdd(key, "field2", "value2"); + var id3 = db.StreamAdd(key, "field3", "value3"); + var id4 = db.StreamAdd(key, "field4", "value4"); + + var deletedCount = db.StreamDelete(key, new RedisValue[] { id2, id3 }, CommandFlags.None); + var messages = db.StreamRange(key, "-", "+"); + + Assert.Equal(2, deletedCount); + Assert.Equal(2, messages.Length); + } + } + + [Fact] + public void StreamGroupInfoGet() + { + var key = GetUniqueKey("group_info"); + var group1 = "test_group_1"; + var group2 = "test_group_2"; + var consumer1 = "test_consumer_1"; + var consumer2 = "test_consumer_2"; + + using (var conn = Create()) + { + Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams); + + var db = conn.GetDatabase(); + + var id1 = db.StreamAdd(key, "field1", "value1"); + var id2 = db.StreamAdd(key, "field2", "value2"); + var id3 = db.StreamAdd(key, "field3", "value3"); + var id4 = db.StreamAdd(key, "field4", "value4"); + + db.StreamCreateConsumerGroup(key, group1, "-"); + db.StreamCreateConsumerGroup(key, group2, "-"); + + // Read a single message into the first consumer. + var consumer1Messages = db.StreamReadGroup(key, group1, consumer1, count: 1); + + // Read the remaining messages into the second consumer. + var consumer2Messages = db.StreamReadGroup(key, group2, consumer2); + + var groupInfoList = db.StreamGroupInfo(key); + + Assert.NotNull(groupInfoList); + Assert.Equal(2, groupInfoList.Length); + + Assert.Equal(group1, groupInfoList[0].Name); + Assert.Equal(1, groupInfoList[0].PendingMessageCount); + + Assert.Equal(group2, groupInfoList[1].Name); + Assert.Equal(4, groupInfoList[1].PendingMessageCount); + } + } + + [Fact] + public void StreamGroupConsumerInfoGet() + { + var key = GetUniqueKey("group_consumer_info"); + var group = "test_group"; + var consumer1 = "test_consumer_1"; + var consumer2 = "test_consumer_2"; + + using (var conn = Create()) + { + Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams); + + var db = conn.GetDatabase(); + + var id1 = db.StreamAdd(key, "field1", "value1"); + var id2 = db.StreamAdd(key, "field2", "value2"); + var id3 = db.StreamAdd(key, "field3", "value3"); + var id4 = db.StreamAdd(key, "field4", "value4"); + + db.StreamCreateConsumerGroup(key, group, "-"); + db.StreamReadGroup(key, group, consumer1, count: 1); + db.StreamReadGroup(key, group, consumer2); + + var consumerInfoList = db.StreamConsumerInfo(key, group); + + Assert.NotNull(consumerInfoList); + Assert.Equal(2, consumerInfoList.Length); + + Assert.Equal(consumer1, consumerInfoList[0].Name); + Assert.Equal(consumer2, consumerInfoList[1].Name); + + Assert.Equal(1, consumerInfoList[0].PendingMessageCount); + Assert.Equal(3, consumerInfoList[1].PendingMessageCount); + } + } + + [Fact] + public void StreamInfoGet() + { + var key = GetUniqueKey("stream_info"); + + using (var conn = Create()) + { + Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams); + + var db = conn.GetDatabase(); + + var id1 = db.StreamAdd(key, "field1", "value1"); + var id2 = db.StreamAdd(key, "field2", "value2"); + var id3 = db.StreamAdd(key, "field3", "value3"); + var id4 = db.StreamAdd(key, "field4", "value4"); + + var streamInfo = db.StreamInfo(key); + + Assert.Equal(4, streamInfo.Length); + Assert.True(streamInfo.RadixTreeKeys > 0); + Assert.True(streamInfo.RadixTreeNodes > 0); + Assert.Equal(id1, streamInfo.FirstEntry.Id); + Assert.Equal(id4, streamInfo.LastEntry.Id); + } + } + + [Fact] + public void StreamInfoGetWithEmptyStream() + { + var key = GetUniqueKey("stream_info_empty"); + + using (var conn = Create()) + { + Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams); + + var db = conn.GetDatabase(); + + // Add an entry and then delete it so the stream is empty, then run streaminfo + // to ensure it functions properly on an empty stream. Namely, the first-entry + // and last-entry messages should be null. + + var id = db.StreamAdd(key, "field1", "value1"); + db.StreamDelete(key, new RedisValue[] { id }); + + Assert.Equal(0, db.StreamLength(key)); + + var streamInfo = db.StreamInfo(key); + + Assert.True(streamInfo.FirstEntry.IsNull); + Assert.True(streamInfo.LastEntry.IsNull); + } + } + + [Fact] + public void StreamNoConsumerGroups() + { + var key = GetUniqueKey("stream_with_no_consumers"); + + using (var conn = Create()) + { + Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams); + + var db = conn.GetDatabase(); + + db.StreamAdd(key, "field1", "value1"); + + var groups = db.StreamGroupInfo(key); + + Assert.NotNull(groups); + Assert.True(groups.Length == 0); + } + } + + [Fact] + public void StreamPendingNoMessagesOrConsumers() + { + var key = GetUniqueKey("stream_pending_empty"); + var groupName = "test_group"; + + using (var conn = Create()) + { + Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams); + + var db = conn.GetDatabase(); + + var id = db.StreamAdd(key, "field1", "value1"); + db.StreamDelete(key, new RedisValue[] { id }); + + db.StreamCreateConsumerGroup(key, groupName, "0-0"); + + var pendingInfo = db.StreamPending(key, "test_group"); + + Assert.Equal(0, pendingInfo.PendingMessageCount); + Assert.Equal(RedisValue.Null, pendingInfo.LowestPendingMessageId); + Assert.Equal(RedisValue.Null, pendingInfo.HighestPendingMessageId); + Assert.NotNull(pendingInfo.Consumers); + Assert.True(pendingInfo.Consumers.Length == 0); + } + } + + [Fact] + public void StreamRead() + { + var key = GetUniqueKey("read"); + + using (var conn = Create()) + { + Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams); + + var db = conn.GetDatabase(); + + var id1 = db.StreamAdd(key, "field1", "value1"); + var id2 = db.StreamAdd(key, "fiedl2", "value2"); + var id3 = db.StreamAdd(key, "field3", "value3"); + + // Read the entire stream from the beginning. + var entries = db.StreamRead(key, "0-0"); + + Assert.True(entries.Length == 3); + Assert.Equal(id1, entries[0].Id); + Assert.Equal(id2, entries[1].Id); + Assert.Equal(id3, entries[2].Id); + } + } + + [Fact] + public void StreamReadEmptyStream() + { + var key = GetUniqueKey("read_empty_stream"); + + using (var conn = Create()) + { + Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams); + + var db = conn.GetDatabase(); + + // Write to a stream to create the key. + var id1 = db.StreamAdd(key, "field1", "value1"); + + // Delete the key to empty the stream. + db.StreamDelete(key, new RedisValue[] { id1 }); + var len = db.StreamLength(key); + + // Read the entire stream from the beginning. + var entries = db.StreamRead(key, "0-0"); + + Assert.True(entries.Length == 0); + Assert.Equal(0, len); + } + } + + [Fact] + public void StreamReadEmptyStreams() + { + var key1 = GetUniqueKey("read_empty_stream_1"); + var key2 = GetUniqueKey("read_empty_stream_2"); + + using (var conn = Create()) + { + Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams); + + var db = conn.GetDatabase(); + + // Write to a stream to create the key. + var id1 = db.StreamAdd(key1, "field1", "value1"); + var id2 = db.StreamAdd(key2, "field2", "value2"); + + // Delete the key to empty the stream. + db.StreamDelete(key1, new RedisValue[] { id1 }); + db.StreamDelete(key2, new RedisValue[] { id2 }); + + var len1 = db.StreamLength(key1); + var len2 = db.StreamLength(key2); + + // Read the entire stream from the beginning. + var entries1 = db.StreamRead(key1, "0-0"); + var entries2 = db.StreamRead(key2, "0-0"); + + Assert.True(entries1.Length == 0); + Assert.True(entries2.Length == 0); + + Assert.Equal(0, len1); + Assert.Equal(0, len2); + } + } + + [Fact] + public void StreamReadExpectedExceptionInvalidCountMultipleStream() + { + using (var conn = Create()) + { + Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams); + + var streamPairs = new StreamIdPair[] + { + new StreamIdPair("key1", "0-0"), + new StreamIdPair("key2", "0-0") + }; + + + var db = conn.GetDatabase(); + Assert.Throws(() => db.StreamRead(streamPairs, 0)); + } + } + + [Fact] + public void StreamReadExpectedExceptionInvalidCountSingleStream() + { + var key = GetUniqueKey("read_exception_invalid_count_single"); + + using (var conn = Create()) + { + Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams); + + var db = conn.GetDatabase(); + Assert.Throws(() => db.StreamRead(key, "0-0", 0)); + } + } + + [Fact] + public void StreamReadExpectedExceptionNullStreamList() + { + using (var conn = Create()) + { + Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams); + + var db = conn.GetDatabase(); + Assert.Throws(() => db.StreamRead(null)); + } + } + + [Fact] + public void StreamReadExpectedExceptionEmptyStreamList() + { + using (var conn = Create()) + { + Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams); + + var db = conn.GetDatabase(); + + var emptyList = new StreamIdPair[0]; + + Assert.Throws(() => db.StreamRead(emptyList)); + } + } + + [Fact] + public void StreamReadMultipleStreams() + { + var key1 = GetUniqueKey("read_multi_1"); + var key2 = GetUniqueKey("read_multi_2"); + + using (var conn = Create()) + { + Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams); + + var db = conn.GetDatabase(); + + var id1 = db.StreamAdd(key1, "field1", "value1"); + var id2 = db.StreamAdd(key1, "fiedl2", "value2"); + var id3 = db.StreamAdd(key2, "field3", "value3"); + var id4 = db.StreamAdd(key2, "field4", "value4"); + + // Read from both streams at the same time. + var streamList = new StreamIdPair[2] + { + new StreamIdPair(key1, "0-0"), + new StreamIdPair(key2, "0-0") + }; + + var streams = db.StreamRead(streamList); + + Assert.True(streams.Length == 2); + + Assert.Equal(key1, streams[0].Key); + Assert.True(streams[0].Entries.Length == 2); + Assert.Equal(id1, streams[0].Entries[0].Id); + Assert.Equal(id2, streams[0].Entries[1].Id); + + Assert.Equal(key2, streams[1].Key); + Assert.True(streams[1].Entries.Length == 2); + Assert.Equal(id3, streams[1].Entries[0].Id); + Assert.Equal(id4, streams[1].Entries[1].Id); + } + } + + [Fact] + public void StreamReadMultipleStreamsWithCount() + { + var key1 = GetUniqueKey("read_multi_count_1"); + var key2 = GetUniqueKey("read_multi_count_2"); + + using (var conn = Create()) + { + Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams); + + var db = conn.GetDatabase(); + + var id1 = db.StreamAdd(key1, "field1", "value1"); + var id2 = db.StreamAdd(key1, "fiedl2", "value2"); + var id3 = db.StreamAdd(key2, "field3", "value3"); + var id4 = db.StreamAdd(key2, "field4", "value4"); + + var streamList = new StreamIdPair[2] + { + new StreamIdPair(key1, "0-0"), + new StreamIdPair(key2, "0-0") + }; + + var streams = db.StreamRead(streamList, countPerStream: 1); + + // We should get both streams back. + Assert.True(streams.Length == 2); + + // Ensure we only got one message per stream. + Assert.True(streams[0].Entries.Length == 1); + Assert.True(streams[1].Entries.Length == 1); + + // Check the message IDs as well. + Assert.Equal(id1, streams[0].Entries[0].Id); + Assert.Equal(id3, streams[1].Entries[0].Id); + } + } + + [Fact] + public void StreamReadMultipleStreamsWithReadPastSecondStream() + { + var key1 = GetUniqueKey("read_multi_1"); + var key2 = GetUniqueKey("read_multi_2"); + + using (var conn = Create()) + { + Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams); + + var db = conn.GetDatabase(); + + var id1 = db.StreamAdd(key1, "field1", "value1"); + var id2 = db.StreamAdd(key1, "fiedl2", "value2"); + var id3 = db.StreamAdd(key2, "field3", "value3"); + var id4 = db.StreamAdd(key2, "field4", "value4"); + + var streamList = new StreamIdPair[2] + { + new StreamIdPair(key1, "0-0"), + + // read past the end of stream # 2 + new StreamIdPair(key2, id4) + }; + + var streams = db.StreamRead(streamList); + + // We should only get the first stream back. + Assert.True(streams.Length == 1); + + Assert.Equal(key1, streams[0].Key); + Assert.True(streams[0].Entries.Length == 2); + } + } + + [Fact] + public void StreamReadMultipleStreamsWithEmptyResponse() + { + var key1 = GetUniqueKey("read_multi_1"); + var key2 = GetUniqueKey("read_multi_2"); + + using (var conn = Create()) + { + Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams); + + var db = conn.GetDatabase(); + + var id1 = db.StreamAdd(key1, "field1", "value1"); + var id2 = db.StreamAdd(key1, "fiedl2", "value2"); + var id3 = db.StreamAdd(key2, "field3", "value3"); + var id4 = db.StreamAdd(key2, "field4", "value4"); + + var streamList = new StreamIdPair[] + { + // Read past the end of both streams. + new StreamIdPair(key1, id2), + new StreamIdPair(key2, id4) + }; + + var streams = db.StreamRead(streamList); + + // We expect an empty response. + Assert.True(streams.Length == 0); + } + } + + [Fact] + public void StreamReadPastEndOfStream() + { + var key = GetUniqueKey("read_empty"); + + using (var conn = Create()) + { + Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams); + + var db = conn.GetDatabase(); + + var id1 = db.StreamAdd(key, "field1", "value1"); + var id2 = db.StreamAdd(key, "fiedl2", "value2"); + + // Read after the final ID in the stream, we expect an empty array as a response. + + var entries = db.StreamRead(key, id2); + + Assert.True(entries.Length == 0); + } + } + + [Fact] + public void StreamReadRange() + { + var key = GetUniqueKey("range"); + + using (var conn = Create()) + { + Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams); + + var db = conn.GetDatabase(); + + var id1 = db.StreamAdd(key, "field1", "value1"); + var id2 = db.StreamAdd(key, "fiedl2", "value2"); + + var entries = db.StreamRange(key); + + Assert.Equal(2, entries.Length); + Assert.Equal(id1, entries[0].Id); + Assert.Equal(id2, entries[1].Id); + } + } + + [Fact] + public void StreamReadRangeOfEmptyStream() + { + var key = GetUniqueKey("range_empty"); + + using (var conn = Create()) + { + Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams); + + var db = conn.GetDatabase(); + + var id1 = db.StreamAdd(key, "field1", "value1"); + var id2 = db.StreamAdd(key, "fiedl2", "value2"); + + var deleted = db.StreamDelete(key, new RedisValue[] { id1, id2 }); + + var entries = db.StreamRange(key, "-", "+"); + + Assert.Equal(2, deleted); + Assert.NotNull(entries); + Assert.True(entries.Length == 0); + } + } + + [Fact] + public void StreamReadRangeWithCount() + { + var key = GetUniqueKey("range_count"); + + using (var conn = Create()) + { + Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams); + + var db = conn.GetDatabase(); + + var id1 = db.StreamAdd(key, "field1", "value1"); + var id2 = db.StreamAdd(key, "fiedl2", "value2"); + + var entries = db.StreamRange(key, count: 1); + + Assert.True(entries.Length == 1); + Assert.Equal(id1, entries[0].Id); + } + } + + [Fact] + public void StreamReadRangeReverse() + { + var key = GetUniqueKey("rangerev"); + + using (var conn = Create()) + { + Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams); + + var db = conn.GetDatabase(); + + var id1 = db.StreamAdd(key, "field1", "value1"); + var id2 = db.StreamAdd(key, "fiedl2", "value2"); + + var entries = db.StreamRange(key, messageOrder: Order.Descending); + + Assert.True(entries.Length == 2); + Assert.Equal(id2, entries[0].Id); + Assert.Equal(id1, entries[1].Id); + } + } + + [Fact] + public void StreamReadRangeReverseWithCount() + { + var key = GetUniqueKey("rangerev_count"); + + using (var conn = Create()) + { + Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams); + + var db = conn.GetDatabase(); + + var id1 = db.StreamAdd(key, "field1", "value1"); + var id2 = db.StreamAdd(key, "fiedl2", "value2"); + + var entries = db.StreamRange(key, count: 1, messageOrder: Order.Descending); + + Assert.True(entries.Length == 1); + Assert.Equal(id2, entries[0].Id); + } + } + + [Fact] + public void StreamReadWithAfterIdAndCount_1() + { + var key = GetUniqueKey("read"); + + using (var conn = Create()) + { + Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams); + + var db = conn.GetDatabase(); + + var id1 = db.StreamAdd(key, "field1", "value1"); + var id2 = db.StreamAdd(key, "fiedl2", "value2"); + var id3 = db.StreamAdd(key, "field3", "value3"); + + // Only read a single item from the stream. + var entries = db.StreamRead(key, id1, 1); + + Assert.True(entries.Length == 1); + Assert.Equal(id2, entries[0].Id); + } + } + + [Fact] + public void StreamReadWithAfterIdAndCount_2() + { + var key = GetUniqueKey("read"); + + using (var conn = Create()) + { + Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams); + + var db = conn.GetDatabase(); + + var id1 = db.StreamAdd(key, "field1", "value1"); + var id2 = db.StreamAdd(key, "fiedl2", "value2"); + var id3 = db.StreamAdd(key, "field3", "value3"); + var id4 = db.StreamAdd(key, "field4", "value4"); + + // Read multiple items from the stream. + var entries = db.StreamRead(key, id1, 2); + + Assert.True(entries.Length == 2); + Assert.Equal(id2, entries[0].Id); + Assert.Equal(id3, entries[1].Id); + } + } + + [Fact] + public void StreamTrimLength() + { + var key = GetUniqueKey("trimlen"); + + using (var conn = Create()) + { + Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams); + + var db = conn.GetDatabase(); + + // Add a couple items and check length. + db.StreamAdd(key, "field1", "value1"); + db.StreamAdd(key, "fiedl2", "value2"); + db.StreamAdd(key, "field3", "value3"); + db.StreamAdd(key, "field4", "value4"); + + var numRemoved = db.StreamTrim(key, 1); + var len = db.StreamLength(key); + + Assert.Equal(3, numRemoved); + Assert.Equal(1, len); + } + } + + [Fact] + public void StreamVerifyLength() + { + var key = GetUniqueKey("len"); + + using (var conn = Create()) + { + Skip.IfMissingFeature(conn, nameof(RedisFeatures.Streams), r => r.Streams); + + var db = conn.GetDatabase(); + + // Add a couple items and check length. + db.StreamAdd(key, "field1", "value1"); + db.StreamAdd(key, "fiedl2", "value2"); + + var len = db.StreamLength(key); + + Assert.Equal(2, len); + } + } + + private string GetUniqueKey(string type) => $"{type}_stream_{DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()}"; + } +} diff --git a/StackExchange.Redis.Tests/WrapperBaseTests.cs b/StackExchange.Redis.Tests/WrapperBaseTests.cs index ed6fb3722..0bf14087d 100644 --- a/StackExchange.Redis.Tests/WrapperBaseTests.cs +++ b/StackExchange.Redis.Tests/WrapperBaseTests.cs @@ -740,6 +740,145 @@ public void SortedSetScoreAsync() mock.Verify(_ => _.SortedSetScoreAsync("prefix:key", "member", CommandFlags.HighPriority)); } + [Fact] + public void StreamAcknowledgeAsync_1() + { + wrapper.StreamAcknowledgeAsync("key", "group", "0-0", CommandFlags.HighPriority); + mock.Verify(_ => _.StreamAcknowledgeAsync("prefix:key", "group", "0-0", CommandFlags.HighPriority)); + } + + [Fact] + public void StreamAcknowledgeAsync_2() + { + var messageIds = new RedisValue[] { "0-0", "0-1", "0-2" }; + wrapper.StreamAcknowledgeAsync("key", "group", messageIds, CommandFlags.HighPriority); + mock.Verify(_ => _.StreamAcknowledgeAsync("prefix:key", "group", messageIds, CommandFlags.HighPriority)); + } + + [Fact] + public void StreamAddAsync_1() + { + wrapper.StreamAddAsync("key", "field1", "value1", "*", 1000, true, CommandFlags.HighPriority); + mock.Verify(_ => _.StreamAddAsync("prefix:key", "field1", "value1", "*", 1000, true, CommandFlags.HighPriority)); + } + + [Fact] + public void StreamAddAsync_2() + { + var fields = new NameValueEntry[0]; + wrapper.StreamAddAsync("key", fields, "*", 1000, true, CommandFlags.HighPriority); + mock.Verify(_ => _.StreamAddAsync("prefix:key", fields, "*", 1000, true, CommandFlags.HighPriority)); + } + + [Fact] + public void StreamClaimMessagesAsync() + { + var messageIds = new RedisValue[0]; + wrapper.StreamClaimAsync("key", "group", "consumer", 1000, messageIds, CommandFlags.HighPriority); + mock.Verify(_ => _.StreamClaimAsync("prefix:key", "group", "consumer", 1000, messageIds, CommandFlags.HighPriority)); + } + + [Fact] + public void StreamClaimMessagesReturningIdsAsync() + { + var messageIds = new RedisValue[0]; + wrapper.StreamClaimIdsOnlyAsync("key", "group", "consumer", 1000, messageIds, CommandFlags.HighPriority); + mock.Verify(_ => _.StreamClaimIdsOnlyAsync("prefix:key", "group", "consumer", 1000, messageIds, CommandFlags.HighPriority)); + } + + [Fact] + public void StreamConsumerInfoGetAsync() + { + wrapper.StreamConsumerInfoAsync("key", "group", CommandFlags.HighPriority); + mock.Verify(_ => _.StreamConsumerInfoAsync("prefix:key", "group", CommandFlags.HighPriority)); + } + + [Fact] + public void StreamCreateConsumerGroupAsync() + { + wrapper.StreamCreateConsumerGroupAsync("key", "group", "0-0", CommandFlags.HighPriority); + mock.Verify(_ => _.StreamCreateConsumerGroupAsync("prefix:key", "group", "0-0", CommandFlags.HighPriority)); + } + + [Fact] + public void StreamGroupInfoGetAsync() + { + wrapper.StreamGroupInfoAsync("key", CommandFlags.HighPriority); + mock.Verify(_ => _.StreamGroupInfoAsync("prefix:key", CommandFlags.HighPriority)); + } + + [Fact] + public void StreamInfoGetAsync() + { + wrapper.StreamInfoAsync("key", CommandFlags.HighPriority); + mock.Verify(_ => _.StreamInfoAsync("prefix:key", CommandFlags.HighPriority)); + } + + [Fact] + public void StreamLengthAsync() + { + wrapper.StreamLengthAsync("key", CommandFlags.HighPriority); + mock.Verify(_ => _.StreamLengthAsync("prefix:key", CommandFlags.HighPriority)); + } + + [Fact] + public void StreamMessagesDeleteAsync() + { + var messageIds = new RedisValue[0] { }; + wrapper.StreamDeleteAsync("key", messageIds, CommandFlags.HighPriority); + mock.Verify(_ => _.StreamDeleteAsync("prefix:key", messageIds, CommandFlags.HighPriority)); + } + + [Fact] + public void StreamPendingInfoGetAsync() + { + wrapper.StreamPendingAsync("key", "group", CommandFlags.HighPriority); + mock.Verify(_ => _.StreamPendingAsync("prefix:key", "group", CommandFlags.HighPriority)); + } + + [Fact] + public void StreamPendingMessageInfoGetAsync() + { + wrapper.StreamPendingMessagesAsync("key", "group", 10, RedisValue.Null, "-", "+", CommandFlags.HighPriority); + mock.Verify(_ => _.StreamPendingMessagesAsync("prefix:key", "group", 10, RedisValue.Null, "-", "+", CommandFlags.HighPriority)); + } + + [Fact] + public void StreamRangeAsync() + { + wrapper.StreamRangeAsync("key", "-", "+", null, Order.Ascending, CommandFlags.HighPriority); + mock.Verify(_ => _.StreamRangeAsync("prefix:key", "-", "+", null, Order.Ascending, CommandFlags.HighPriority)); + } + + [Fact] + public void StreamReadAsync_1() + { + var keysAndIds = new StreamIdPair[0] { }; + wrapper.StreamReadAsync(keysAndIds, null, CommandFlags.HighPriority); + mock.Verify(_ => _.StreamReadAsync(keysAndIds, null, CommandFlags.HighPriority)); + } + + [Fact] + public void StreamReadAsync_2() + { + wrapper.StreamReadAsync("key", "0-0", null, CommandFlags.HighPriority); + mock.Verify(_ => _.StreamReadAsync("prefix:key", "0-0", null, CommandFlags.HighPriority)); + } + + [Fact] + public void StreamReadGroupAsync() + { + wrapper.StreamReadGroupAsync("key", "group", "consumer", "0-0", 10, CommandFlags.HighPriority); + mock.Verify(_ => _.StreamReadGroupAsync("prefix:key", "group", "consumer", "0-0", 10, CommandFlags.HighPriority)); + } + + [Fact] + public void StreamTrimAsync() + { + wrapper.StreamTrimAsync("key", 1000, true, CommandFlags.HighPriority); + mock.Verify(_ => _.StreamTrimAsync("prefix:key", 1000, true, CommandFlags.HighPriority)); + } + [Fact] public void StringAppendAsync() { diff --git a/StackExchange.Redis/StackExchange/Redis/Enums/RedisCommand.cs b/StackExchange.Redis/StackExchange/Redis/Enums/RedisCommand.cs index 16fb9f736..5e6a66ada 100644 --- a/StackExchange.Redis/StackExchange/Redis/Enums/RedisCommand.cs +++ b/StackExchange.Redis/StackExchange/Redis/Enums/RedisCommand.cs @@ -163,6 +163,20 @@ internal enum RedisCommand WATCH, + XACK, + XADD, + XCLAIM, + XDEL, + XGROUP, + XINFO, + XLEN, + XPENDING, + XRANGE, + XREAD, + XREADGROUP, + XREVRANGE, + XTRIM, + ZADD, ZCARD, ZCOUNT, diff --git a/StackExchange.Redis/StackExchange/Redis/Enums/RedisType.cs b/StackExchange.Redis/StackExchange/Redis/Enums/RedisType.cs index 240187c10..54d49cd03 100644 --- a/StackExchange.Redis/StackExchange/Redis/Enums/RedisType.cs +++ b/StackExchange.Redis/StackExchange/Redis/Enums/RedisType.cs @@ -38,6 +38,13 @@ public enum RedisType /// https://redis.io/commands#hash Hash, /// + /// A Redis Stream is a data structure which models the behavior of an append only log but it has more + /// advanced features for manipulating the data contained within the stream. Each entry in a + /// stream contains a unique message ID and a list of name/value pairs containing the entry's data. + /// + /// https://redis.io/commands#stream + Stream, + /// /// The data-type was not recognised by the client library /// Unknown, diff --git a/StackExchange.Redis/StackExchange/Redis/ExtensionMethods.cs b/StackExchange.Redis/StackExchange/Redis/ExtensionMethods.cs index 0eac4dfbd..ea3c1ec5b 100644 --- a/StackExchange.Redis/StackExchange/Redis/ExtensionMethods.cs +++ b/StackExchange.Redis/StackExchange/Redis/ExtensionMethods.cs @@ -122,6 +122,18 @@ public static Dictionary ToDictionary(this KeyValuePair + /// Create an array of RedisValues from an array of strings. + /// + /// The string array to convert to RedisValues + public static RedisValue[] ToRedisValueArray(this string[] values) + { + if (values == null) return null; + if (values.Length == 0) return nixValues; + return Array.ConvertAll(values, x => (RedisValue)x); + } + private static readonly string[] nix = new string[0]; /// /// Create an array of strings from an array of values diff --git a/StackExchange.Redis/StackExchange/Redis/Interfaces/IDatabase.cs b/StackExchange.Redis/StackExchange/Redis/Interfaces/IDatabase.cs index f98178559..8511570f7 100644 --- a/StackExchange.Redis/StackExchange/Redis/Interfaces/IDatabase.cs +++ b/StackExchange.Redis/StackExchange/Redis/Interfaces/IDatabase.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Collections.Generic; using System.Net; @@ -1389,6 +1389,225 @@ IEnumerable SortedSetScan(RedisKey key, /// https://redis.io/commands/zscore double? SortedSetScore(RedisKey key, RedisValue member, CommandFlags flags = CommandFlags.None); + /// + /// Allow the consumer to mark a pending message as correctly processed. Returns the number of messages acknowledged. + /// + /// The key of the stream. + /// The name of the consumer group that received the message. + /// The ID of the message to acknowledge. + /// The flags to use for this operation. + /// The number of messages acknowledged. + /// https://redis.io/topics/streams-intro + long StreamAcknowledge(RedisKey key, RedisValue groupName, RedisValue messageId, CommandFlags flags = CommandFlags.None); + + /// + /// Allow the consumer to mark a pending message as correctly processed. Returns the number of messages acknowledged. + /// + /// The key of the stream. + /// The name of the consumer group that received the message. + /// The IDs of the messages to acknowledge. + /// The flags to use for this operation. + /// The number of messages acknowledged. + /// https://redis.io/topics/streams-intro + long StreamAcknowledge(RedisKey key, RedisValue groupName, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None); + + /// + /// Adds an entry using the specified values to the given stream key. If key does not exist, a new key holding a stream is created. The command returns the ID of the newly created stream entry. + /// + /// The key of the stream. + /// The field name for the stream entry. + /// The value to set in the stream entry. + /// The ID to assign to the stream entry, defaults to an auto-generated ID ("*"). + /// The maximum length of the stream. + /// If true, the "~" argument is used to allow the stream to exceed max length by a small number. This improves performance when removing messages. + /// The flags to use for this operation. + /// The ID of the newly created message. + /// https://redis.io/commands/xadd + RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None); + + /// + /// Adds an entry using the specified values to the given stream key. If key does not exist, a new key holding a stream is created. The command returns the ID of the newly created stream entry. + /// + /// The key of the stream. + /// The fields and their associated values to set in the stream entry. + /// The ID to assign to the stream entry, defaults to an auto-generated ID ("*"). + /// The maximum length of the stream. + /// If true, the "~" argument is used to allow the stream to exceed max length by a small number. This improves performance when removing messages. + /// The flags to use for this operation. + /// The ID of the newly created message. + /// https://redis.io/commands/xadd + RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None); + + /// + /// Change ownership of messages consumed, but not yet acknowledged, by a different consumer. This method returns the complete message for the claimed message(s). + /// + /// The key of the stream. + /// The consumer group. + /// The consumer claiming the given message(s). + /// The minimum message idle time to allow the reassignment of the message(s). + /// The IDs of the messages to claim for the given consumer. + /// The flags to use for this operation. + /// The messages successfully claimed by the given consumer. + /// https://redis.io/topics/streams-intro + RedisStreamEntry[] StreamClaim(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None); + + /// + /// Change ownership of messages consumed, but not yet acknowledged, by a different consumer. This method returns the IDs for the claimed message(s). + /// + /// The key of the stream. + /// The consumer group. + /// The consumer claiming the given message(s). + /// The minimum message idle time to allow the reassignment of the message(s). + /// The IDs of the messages to claim for the given consumer. + /// The flags to use for this operation. + /// The message IDs for the messages successfully claimed by the given consumer. + /// https://redis.io/topics/streams-intro + RedisValue[] StreamClaimIdsOnly(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None); + + /// + /// Retrieve information about the consumers for the given consumer group. This is the equivalent of calling "XINFO GROUPS key group". + /// + /// The key of the stream. + /// The consumer group name. + /// The flags to use for this operation. + /// An instance of for each of the consumer group's consumers. + /// https://redis.io/topics/streams-intro + StreamConsumerInfo[] StreamConsumerInfo(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None); + + /// + /// Create a consumer group for the given stream. + /// + /// The key of the stream. + /// The name of the group to create. + /// The beginning position in the stream from which to read. If null, the method will send the option ("$") to only read new messages. + /// The flags to use for this operation. + /// True if the group was created. + /// https://redis.io/topics/streams-intro + bool StreamCreateConsumerGroup(RedisKey key, RedisValue groupName, RedisValue? readFrom = null, CommandFlags flags = CommandFlags.None); + + /// + /// Delete messages in the stream. This method does not delete the stream. + /// + /// The key of the stream. + /// The IDs of the messages to delete. + /// The flags to use for this operation. + /// Returns the number of messages successfully deleted from the stream. + /// https://redis.io/topics/streams-intro + long StreamDelete(RedisKey key, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None); + + /// + /// Retrieve information about the groups created for the given stream. This is the equivalent of calling "XINFO GROUPS key". + /// + /// The key of the stream. + /// The flags to use for this operation. + /// An instance of for each of the stream's groups. + /// https://redis.io/topics/streams-intro + StreamGroupInfo[] StreamGroupInfo(RedisKey key, CommandFlags flags = CommandFlags.None); + + /// + /// Retrieve information about the given stream. This is the equivalent of calling "XINFO STREAM key". + /// + /// The key of the stream. + /// The flags to use for this operation. + /// A instance with information about the stream. + /// https://redis.io/topics/streams-intro + StreamInfo StreamInfo(RedisKey key, CommandFlags flags = CommandFlags.None); + + /// + /// Return the number of entries in a stream. + /// + /// The key of the stream. + /// The flags to use for this operation. + /// The number of entries inside the given stream. + /// https://redis.io/commands/xlen + long StreamLength(RedisKey key, CommandFlags flags = CommandFlags.None); + + /// + /// View information about pending messages for a stream. A pending message is a message read using StreamReadGroup (XREADGROUP) but not yet acknowledged. + /// + /// The key of the stream. + /// The name of the consumer group + /// The flags to use for this operation. + /// An instance of . contains the number of pending messages, the highest and lowest ID of the pending messages, and the consumers with their pending message count. + /// The equivalent of calling XPENDING key group. + /// https://redis.io/commands/xpending + StreamPendingInfo StreamPending(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None); + + /// + /// View information about each pending message. + /// + /// The key of the stream. + /// The name of the consumer group. + /// The maximum number of pending messages to return. + /// The consumer name for the pending messages. Pass RedisValue.Null to include pending messages for all consumers. + /// The minimum ID from which to read the stream of pending messages. The method will default to reading from the beginning of the stream. + /// The maximum ID to read to within the stream of pending messages. The method will default to reading to the end of the stream. + /// The flags to use for this operation. + /// An instance of for each pending message. + /// Equivalent of calling XPENDING key group start-id end-id count consumer-name. + /// https://redis.io/commands/xpending + StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None); + + /// + /// Read a stream using the given range of IDs. + /// + /// The key of the stream. + /// The minimum ID from which to read the stream. The method will default to reading from the beginning of the stream. + /// The maximum ID to read to within the stream. The method will default to reading to the end of the stream. + /// The maximum number of messages to return. + /// The order of the messages. will execute XRANGE and wil execute XREVRANGE. + /// The flags to use for this operation. + /// Returns an instance of for each message returned. + /// https://redis.io/commands/xrange + RedisStreamEntry[] StreamRange(RedisKey key, RedisValue? minId = null, RedisValue? maxId = null, int? count = null, Order messageOrder = Order.Ascending, CommandFlags flags = CommandFlags.None); + + /// + /// Read from a single stream. + /// + /// The key of the stream. + /// The position from within the stream to begin reading. + /// The maximum number of messages to return. + /// The flags to use for this operation. + /// Returns an instance of for each message returned. + /// Equivalent of calling XREAD COUNT num STREAMS key id. + /// https://redis.io/commands/xread + RedisStreamEntry[] StreamRead(RedisKey key, RedisValue afterId, int? count = null, CommandFlags flags = CommandFlags.None); + + /// + /// Read from multiple streams. + /// + /// The list of streams and the ID from which to begin reading for each stream. + /// The maximum number of messages to return from each stream. + /// The flags to use for this operation. + /// An instance of for each stream. + /// Equivalent of calling XREAD COUNT num STREAMS key1 key2 id1 id2. + /// https://redis.io/commands/xread + RedisStream[] StreamRead(StreamIdPair[] streamIdPairs, int? countPerStream = null, CommandFlags flags = CommandFlags.None); + + /// + /// Read messages from a stream and an associated consumer group. + /// + /// The key of the stream. + /// The name of the consumer group. + /// The consumer name. + /// The ID from within the stream to begin reading. If null, the method will send the option (">") to only read new, previously undelivered messages. + /// The maximum number of messages to return. + /// The flags to use for this operation. + /// Returns an instance of for each message returned. + /// https://redis.io/commands/xreadgroup + RedisStreamEntry[] StreamReadGroup(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? readFromId = null, int? count = null, CommandFlags flags = CommandFlags.None); + + /// + /// Trim the stream to a specified maximum length. + /// + /// The key of the stream. + /// The maximum length of the stream. + /// If true, the "~" argument is used to allow the stream to exceed max length by a small number. This improves performance when removing messages. + /// The flags to use for this operation. + /// The number of messages removed from the stream. + /// https://redis.io/topics/streams-intro + long StreamTrim(RedisKey key, int maxLength, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None); + /// /// If key already exists and is a string, this command appends the value at the end of the string. If key does not exist it is created and set as an empty string, /// so APPEND will be similar to SET in this special case. diff --git a/StackExchange.Redis/StackExchange/Redis/Interfaces/IDatabaseAsync.cs b/StackExchange.Redis/StackExchange/Redis/Interfaces/IDatabaseAsync.cs index ee9affec5..438079abe 100644 --- a/StackExchange.Redis/StackExchange/Redis/Interfaces/IDatabaseAsync.cs +++ b/StackExchange.Redis/StackExchange/Redis/Interfaces/IDatabaseAsync.cs @@ -1299,6 +1299,225 @@ Task SortedSetRangeByValueAsync(RedisKey key, /// https://redis.io/commands/zscore Task SortedSetScoreAsync(RedisKey key, RedisValue member, CommandFlags flags = CommandFlags.None); + /// + /// Allow the consumer to mark a pending message as correctly processed. Returns the number of messages acknowledged. + /// + /// The key of the stream. + /// The name of the consumer group that received the message. + /// The ID of the message to acknowledge. + /// The flags to use for this operation. + /// The number of messages acknowledged. + /// https://redis.io/topics/streams-intro + Task StreamAcknowledgeAsync(RedisKey key, RedisValue groupName, RedisValue messageId, CommandFlags flags = CommandFlags.None); + + /// + /// Allow the consumer to mark a pending message as correctly processed. Returns the number of messages acknowledged. + /// + /// The key of the stream. + /// The name of the consumer group that received the message. + /// The IDs of the messages to acknowledge. + /// The flags to use for this operation. + /// The number of messages acknowledged. + /// https://redis.io/topics/streams-intro + Task StreamAcknowledgeAsync(RedisKey key, RedisValue groupName, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None); + + /// + /// Adds an entry using the specified values to the given stream key. If key does not exist, a new key holding a stream is created. The command returns the ID of the newly created stream entry. + /// + /// The key of the stream. + /// The field name for the stream entry. + /// The value to set in the stream entry. + /// The ID to assign to the stream entry, defaults to an auto-generated ID ("*"). + /// The maximum length of the stream. + /// If true, the "~" argument is used to allow the stream to exceed max length by a small number. This improves performance when removing messages. + /// The flags to use for this operation. + /// The ID of the newly created message. + /// https://redis.io/commands/xadd + Task StreamAddAsync(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None); + + /// + /// Adds an entry using the specified values to the given stream key. If key does not exist, a new key holding a stream is created. The command returns the ID of the newly created stream entry. + /// + /// The key of the stream. + /// The fields and their associated values to set in the stream entry. + /// The ID to assign to the stream entry, defaults to an auto-generated ID ("*"). + /// The maximum length of the stream. + /// If true, the "~" argument is used to allow the stream to exceed max length by a small number. This improves performance when removing messages. + /// The flags to use for this operation. + /// The ID of the newly created message. + /// https://redis.io/commands/xadd + Task StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None); + + /// + /// Change ownership of messages consumed, but not yet acknowledged, by a different consumer. This method returns the complete message for the claimed message(s). + /// + /// The key of the stream. + /// The consumer group. + /// The consumer claiming the given messages. + /// The minimum message idle time to allow the reassignment of the message(s). + /// The IDs of the messages to claim for the given consumer. + /// The flags to use for this operation. + /// The messages successfully claimed by the given consumer. + /// https://redis.io/topics/streams-intro + Task StreamClaimAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None); + + /// + /// Change ownership of messages consumed, but not yet acknowledged, by a different consumer. This method returns the IDs for the claimed message(s). + /// + /// The key of the stream. + /// The consumer group. + /// The consumer claiming the given message(s). + /// The minimum message idle time to allow the reassignment of the message(s). + /// The IDs of the messages to claim for the given consumer. + /// The flags to use for this operation. + /// The message IDs for the messages successfully claimed by the given consumer. + /// https://redis.io/topics/streams-intro + Task StreamClaimIdsOnlyAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None); + + /// + /// Retrieve information about the consumers for the given consumer group. This is the equivalent of calling "XINFO GROUPS key group". + /// + /// The key of the stream. + /// The consumer group name. + /// The flags to use for this operation. + /// An instance of for each of the consumer group's consumers. + /// https://redis.io/topics/streams-intro + Task StreamConsumerInfoAsync(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None); + + /// + /// Create a consumer group for the given stream. + /// + /// The key of the stream. + /// The name of the group to create. + /// The beginning position in the stream from which to read. If null, the method will send the option ("$") to only read new messages. + /// The flags to use for this operation. + /// True if the group was created. + /// https://redis.io/topics/streams-intro + Task StreamCreateConsumerGroupAsync(RedisKey key, RedisValue groupName, RedisValue? readFrom = null, CommandFlags flags = CommandFlags.None); + + /// + /// Delete messages in the stream. This method does not delete the stream. + /// + /// The key of the stream. + /// The IDs of the messages to delete. + /// The flags to use for this operation. + /// Returns the number of messages successfully deleted from the stream. + /// https://redis.io/topics/streams-intro + Task StreamDeleteAsync(RedisKey key, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None); + + /// + /// Retrieve information about the groups created for the given stream. This is the equivalent of calling "XINFO GROUPS key". + /// + /// The key of the stream. + /// The flags to use for this operation. + /// An instance of for each of the stream's groups. + /// https://redis.io/topics/streams-intro + Task StreamGroupInfoAsync(RedisKey key, CommandFlags flags = CommandFlags.None); + + /// + /// Retrieve information about the given stream. This is the equivalent of calling "XINFO STREAM key". + /// + /// The key of the stream. + /// The flags to use for this operation. + /// A instance with information about the stream. + /// https://redis.io/topics/streams-intro + Task StreamInfoAsync(RedisKey key, CommandFlags flags = CommandFlags.None); + + /// + /// Return the number of entries in a stream. + /// + /// The key of the stream. + /// The flags to use for this operation. + /// The number of entries inside the given stream. + /// https://redis.io/commands/xlen + Task StreamLengthAsync(RedisKey key, CommandFlags flags = CommandFlags.None); + + /// + /// View information about pending messages for a stream. A pending message is a message read using StreamReadGroup (XREADGROUP) but not yet acknowledged. + /// + /// The key of the stream. + /// The name of the consumer group + /// The flags to use for this operation. + /// An instance of . contains the number of pending messages, the highest and lowest ID of the pending messages, and the consumers with their pending message count. + /// The equivalent of calling XPENDING key group. + /// https://redis.io/commands/xpending + Task StreamPendingAsync(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None); + + /// + /// View information about each pending message. + /// + /// The key of the stream. + /// The name of the consumer group. + /// The maximum number of pending messages to return. + /// The consumer name for the pending messages. Pass RedisValue.Null to include pending messages for all consumers. + /// The minimum ID from which to read the stream of pending messages. The method will default to reading from the beginning of the stream. + /// The maximum ID to read to within the stream of pending messages. The method will default to reading to the end of the stream. + /// The flags to use for this operation. + /// An instance of for each pending message. + /// Equivalent of calling XPENDING key group start-id end-id count consumer-name. + /// https://redis.io/commands/xpending + Task StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None); + + /// + /// Read a stream using the given range of IDs. + /// + /// The key of the stream. + /// The minimum ID from which to read the stream. The method will default to reading from the beginning of the stream. + /// The maximum ID to read to within the stream. The method will default to reading to the end of the stream. + /// The maximum number of messages to return. + /// The order of the messages. will execute XRANGE and wil execute XREVRANGE. + /// The flags to use for this operation. + /// Returns an instance of for each message returned. + /// https://redis.io/commands/xrange + Task StreamRangeAsync(RedisKey key, RedisValue? minId = null, RedisValue? maxId = null, int? count = null, Order messageOrder = Order.Ascending, CommandFlags flags = CommandFlags.None); + + /// + /// Read from a single stream. + /// + /// The key of the stream. + /// The message ID from within the stream to begin reading. + /// The maximum number of messages to return. + /// The flags to use for this operation. + /// Returns an instance of for each message returned. + /// Equivalent of calling XREAD COUNT num STREAMS key id. + /// https://redis.io/commands/xread + Task StreamReadAsync(RedisKey key, RedisValue afterId, int? count = null, CommandFlags flags = CommandFlags.None); + + /// + /// Read from multiple streams. + /// + /// The list of streams and the ID from which to begin reading for each stream. + /// The maximum number of messages to return from each stream. + /// The flags to use for this operation. + /// An instance of for each stream. + /// Equivalent of calling XREAD COUNT num STREAMS key1 key2 id1 id2. + /// https://redis.io/commands/xread + Task StreamReadAsync(StreamIdPair[] streamIdPairs, int? countPerStream = null, CommandFlags flags = CommandFlags.None); + + /// + /// Read messages from a stream and an associated consumer group. + /// + /// The key of the stream. + /// The name of the consumer group. + /// The consumer name. + /// The ID from within the stream to begin reading. If null, the method will send the option (">") to only read new, previously undelivered messages. + /// The maximum number of messages to return. + /// The flags to use for this operation. + /// Returns an instance of for each message returned. + /// https://redis.io/commands/xreadgroup + Task StreamReadGroupAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? readFromId = null, int? count = null, CommandFlags flags = CommandFlags.None); + + /// + /// Trim the stream to a specified maximum length. + /// + /// The key of the stream. + /// The maximum length of the stream. + /// If true, the "~" argument is used to allow the stream to exceed max length by a small number. This improves performance when removing messages. + /// The flags to use for this operation. + /// The number of messages removed from the stream. + /// https://redis.io/topics/streams-intro + Task StreamTrimAsync(RedisKey key, int maxLength, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None); + /// /// If key already exists and is a string, this command appends the value at the end of the string. If key does not exist it is created and set as an empty string, /// so APPEND will be similar to SET in this special case. diff --git a/StackExchange.Redis/StackExchange/Redis/KeyspaceIsolation/DatabaseWrapper.cs b/StackExchange.Redis/StackExchange/Redis/KeyspaceIsolation/DatabaseWrapper.cs index f5ec1177f..34a96c2b5 100644 --- a/StackExchange.Redis/StackExchange/Redis/KeyspaceIsolation/DatabaseWrapper.cs +++ b/StackExchange.Redis/StackExchange/Redis/KeyspaceIsolation/DatabaseWrapper.cs @@ -587,6 +587,101 @@ public long SortedSetRemoveRangeByValue(RedisKey key, RedisValue min, RedisValue return Inner.SortedSetScore(ToInner(key), member, flags); } + public long StreamAcknowledge(RedisKey key, RedisValue groupName, RedisValue messageId, CommandFlags flags = CommandFlags.None) + { + return Inner.StreamAcknowledge(ToInner(key), groupName, messageId, flags); + } + + public long StreamAcknowledge(RedisKey key, RedisValue groupName, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) + { + return Inner.StreamAcknowledge(ToInner(key), groupName, messageIds, flags); + } + + public RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None) + { + return Inner.StreamAdd(ToInner(key), streamField, streamValue, messageId, maxLength, useApproximateMaxLength, flags); + } + + public RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None) + { + return Inner.StreamAdd(ToInner(key), streamPairs, messageId, maxLength, useApproximateMaxLength, flags); + } + + public RedisStreamEntry[] StreamClaim(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) + { + return Inner.StreamClaim(ToInner(key), consumerGroup, claimingConsumer, minIdleTimeInMs, messageIds, flags); + } + + public RedisValue[] StreamClaimIdsOnly(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) + { + return Inner.StreamClaimIdsOnly(ToInner(key), consumerGroup, claimingConsumer, minIdleTimeInMs, messageIds, flags); + } + + public bool StreamCreateConsumerGroup(RedisKey key, RedisValue groupName, RedisValue? readFrom = null, CommandFlags flags = CommandFlags.None) + { + return Inner.StreamCreateConsumerGroup(ToInner(key), groupName, readFrom, flags); + } + + public StreamInfo StreamInfo(RedisKey key, CommandFlags flags = CommandFlags.None) + { + return Inner.StreamInfo(ToInner(key), flags); + } + + public StreamGroupInfo[] StreamGroupInfo(RedisKey key, CommandFlags flags = CommandFlags.None) + { + return Inner.StreamGroupInfo(ToInner(key), flags); + } + + public StreamConsumerInfo[] StreamConsumerInfo(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None) + { + return Inner.StreamConsumerInfo(ToInner(key), groupName, flags); + } + + public long StreamLength(RedisKey key, CommandFlags flags = CommandFlags.None) + { + return Inner.StreamLength(ToInner(key), flags); + } + + public long StreamDelete(RedisKey key, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) + { + return Inner.StreamDelete(ToInner(key), messageIds, flags); + } + + public StreamPendingInfo StreamPending(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None) + { + return Inner.StreamPending(ToInner(key), groupName, flags); + } + + public StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None) + { + return Inner.StreamPendingMessages(ToInner(key), groupName, count, consumerName, minId, maxId, flags); + } + + public RedisStreamEntry[] StreamRange(RedisKey key, RedisValue? minId = null, RedisValue? maxId = null, int? count = null, Order order = Order.Ascending, CommandFlags flags = CommandFlags.None) + { + return Inner.StreamRange(ToInner(key), minId, maxId, count, order, flags); + } + + public RedisStreamEntry[] StreamRead(RedisKey key, RedisValue afterId, int? count = null, CommandFlags flags = CommandFlags.None) + { + return Inner.StreamRead(ToInner(key), afterId, count, flags); + } + + public RedisStream[] StreamRead(StreamIdPair[] streamIdPairs, int? countPerStream = null, CommandFlags flags = CommandFlags.None) + { + return Inner.StreamRead(streamIdPairs, countPerStream, flags); + } + + public RedisStreamEntry[] StreamReadGroup(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? readFromId = null, int? count = null, CommandFlags flags = CommandFlags.None) + { + return Inner.StreamReadGroup(ToInner(key), groupName, consumerName, readFromId, count, flags); + } + + public long StreamTrim(RedisKey key, int maxLength, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None) + { + return Inner.StreamTrim(ToInner(key), maxLength, useApproximateMaxLength, flags); + } + public long StringAppend(RedisKey key, RedisValue value, CommandFlags flags = CommandFlags.None) { return Inner.StringAppend(ToInner(key), value, flags); diff --git a/StackExchange.Redis/StackExchange/Redis/KeyspaceIsolation/WrapperBase.cs b/StackExchange.Redis/StackExchange/Redis/KeyspaceIsolation/WrapperBase.cs index fa6cb6182..3a5b1e58a 100644 --- a/StackExchange.Redis/StackExchange/Redis/KeyspaceIsolation/WrapperBase.cs +++ b/StackExchange.Redis/StackExchange/Redis/KeyspaceIsolation/WrapperBase.cs @@ -566,6 +566,101 @@ public Task SortedSetRemoveRangeByValueAsync(RedisKey key, RedisValue min, return Inner.SortedSetScoreAsync(ToInner(key), member, flags); } + public Task StreamAcknowledgeAsync(RedisKey key, RedisValue groupName, RedisValue messageId, CommandFlags flags = CommandFlags.None) + { + return Inner.StreamAcknowledgeAsync(ToInner(key), groupName, messageId, flags); + } + + public Task StreamAcknowledgeAsync(RedisKey key, RedisValue groupName, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) + { + return Inner.StreamAcknowledgeAsync(ToInner(key), groupName, messageIds, flags); + } + + public Task StreamAddAsync(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None) + { + return Inner.StreamAddAsync(ToInner(key), streamField, streamValue, messageId, maxLength, useApproximateMaxLength, flags); + } + + public Task StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None) + { + return Inner.StreamAddAsync(ToInner(key), streamPairs, messageId, maxLength, useApproximateMaxLength, flags); + } + + public Task StreamClaimAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) + { + return Inner.StreamClaimAsync(ToInner(key), consumerGroup, claimingConsumer, minIdleTimeInMs, messageIds, flags); + } + + public Task StreamClaimIdsOnlyAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) + { + return Inner.StreamClaimIdsOnlyAsync(ToInner(key), consumerGroup, claimingConsumer, minIdleTimeInMs, messageIds, flags); + } + + public Task StreamCreateConsumerGroupAsync(RedisKey key, RedisValue groupName, RedisValue? readFrom = null, CommandFlags flags = CommandFlags.None) + { + return Inner.StreamCreateConsumerGroupAsync(ToInner(key), groupName, readFrom, flags); + } + + public Task StreamInfoAsync(RedisKey key, CommandFlags flags = CommandFlags.None) + { + return Inner.StreamInfoAsync(ToInner(key), flags); + } + + public Task StreamGroupInfoAsync(RedisKey key, CommandFlags flags = CommandFlags.None) + { + return Inner.StreamGroupInfoAsync(ToInner(key), flags); + } + + public Task StreamConsumerInfoAsync(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None) + { + return Inner.StreamConsumerInfoAsync(ToInner(key), groupName, flags); + } + + public Task StreamLengthAsync(RedisKey key, CommandFlags flags = CommandFlags.None) + { + return Inner.StreamLengthAsync(ToInner(key), flags); + } + + public Task StreamDeleteAsync(RedisKey key, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) + { + return Inner.StreamDeleteAsync(ToInner(key), messageIds, flags); + } + + public Task StreamPendingAsync(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None) + { + return Inner.StreamPendingAsync(ToInner(key), groupName, flags); + } + + public Task StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None) + { + return Inner.StreamPendingMessagesAsync(ToInner(key), groupName, count, consumerName, minId, maxId, flags); + } + + public Task StreamRangeAsync(RedisKey key, RedisValue? minId = null, RedisValue? maxId = null, int? count = null, Order order = Order.Ascending, CommandFlags flags = CommandFlags.None) + { + return Inner.StreamRangeAsync(ToInner(key), minId, maxId, count, order, flags); + } + + public Task StreamReadAsync(RedisKey key, RedisValue afterId, int? count = null, CommandFlags flags = CommandFlags.None) + { + return Inner.StreamReadAsync(ToInner(key), afterId, count, flags); + } + + public Task StreamReadAsync(StreamIdPair[] streamIdPairs, int? countPerStream = null, CommandFlags flags = CommandFlags.None) + { + return Inner.StreamReadAsync(streamIdPairs, countPerStream, flags); + } + + public Task StreamReadGroupAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? readFromId = null, int? count = null, CommandFlags flags = CommandFlags.None) + { + return Inner.StreamReadGroupAsync(ToInner(key), groupName, consumerName, readFromId, count, flags); + } + + public Task StreamTrimAsync(RedisKey key, int maxLength, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None) + { + return Inner.StreamTrimAsync(ToInner(key), maxLength, useApproximateMaxLength, flags); + } + public Task StringAppendAsync(RedisKey key, RedisValue value, CommandFlags flags = CommandFlags.None) { return Inner.StringAppendAsync(ToInner(key), value, flags); diff --git a/StackExchange.Redis/StackExchange/Redis/NameValueEntry.cs b/StackExchange.Redis/StackExchange/Redis/NameValueEntry.cs new file mode 100644 index 000000000..ea091112b --- /dev/null +++ b/StackExchange.Redis/StackExchange/Redis/NameValueEntry.cs @@ -0,0 +1,84 @@ +using System; +using System.Collections.Generic; + +namespace StackExchange.Redis +{ + /// + /// Describes a value contained in a stream (a name/value pair). + /// + public struct NameValueEntry : IEquatable + { + internal readonly RedisValue name, value; + + /// + /// Initializes a value. + /// + /// The name for this entry. + /// The value for this entry. + public NameValueEntry(RedisValue name, RedisValue value) + { + this.name = name; + this.value = value; + } + + /// + /// The name of the field. + /// + public RedisValue Name => name; + + /// + /// The value of the field. + /// + public RedisValue Value => value; + + /// + /// Converts to a key/value pair + /// + /// The to create a from. + public static implicit operator KeyValuePair(NameValueEntry value) => + new KeyValuePair(value.name, value.value); + + /// + /// Converts from a key/value pair + /// + /// The to get a from. + public static implicit operator NameValueEntry(KeyValuePair value) => + new NameValueEntry(value.Key, value.Value); + + /// + /// See Object.ToString() + /// + public override string ToString() => name + ": " + value; + + /// + /// See Object.GetHashCode() + /// + public override int GetHashCode() => name.GetHashCode() ^ value.GetHashCode(); + + /// + /// Compares two values for equality. + /// + /// The to compare to. + public override bool Equals(object obj) => obj is NameValueEntry heObj && Equals(heObj); + + /// + /// Compares two values for equality. + /// + /// The to compare to. + public bool Equals(NameValueEntry other) => name == other.name && value == other.value; + + /// + /// Compares two values for equality + /// + /// The first to compare. + /// The second to compare. + public static bool operator ==(NameValueEntry x, NameValueEntry y) => x.name == y.name && x.value == y.value; + + /// + /// Compares two values for non-equality + /// + /// The first to compare. + /// The second to compare. + public static bool operator !=(NameValueEntry x, NameValueEntry y) => x.name != y.name || x.value != y.value; + } +} diff --git a/StackExchange.Redis/StackExchange/Redis/RawResult.cs b/StackExchange.Redis/StackExchange/Redis/RawResult.cs index f4c268e7d..98a9e19ca 100644 --- a/StackExchange.Redis/StackExchange/Redis/RawResult.cs +++ b/StackExchange.Redis/StackExchange/Redis/RawResult.cs @@ -7,6 +7,9 @@ internal struct RawResult { public static readonly RawResult EmptyArray = new RawResult(new RawResult[0]); public static readonly RawResult Nil = new RawResult(); + + public static RawResult CreateMultiBulk(params RawResult[] results) => new RawResult(results); + private static readonly byte[] emptyBlob = new byte[0]; private readonly int offset, count; private readonly Array arr; diff --git a/StackExchange.Redis/StackExchange/Redis/RedisDatabase.cs b/StackExchange.Redis/StackExchange/Redis/RedisDatabase.cs index a5016a445..7d17fb0f6 100644 --- a/StackExchange.Redis/StackExchange/Redis/RedisDatabase.cs +++ b/StackExchange.Redis/StackExchange/Redis/RedisDatabase.cs @@ -1579,6 +1579,334 @@ IEnumerable IDatabase.SortedSetScan(RedisKey key, RedisValue pat return ExecuteAsync(msg, ResultProcessor.NullableDouble); } + public long StreamAcknowledge(RedisKey key, RedisValue groupName, RedisValue messageId, CommandFlags flags = CommandFlags.None) + { + var msg = GetStreamAcknowledgeMessage(key, groupName, messageId, flags); + return ExecuteSync(msg, ResultProcessor.Int64); + } + + public Task StreamAcknowledgeAsync(RedisKey key, RedisValue groupName, RedisValue messageId, CommandFlags flags = CommandFlags.None) + { + var msg = GetStreamAcknowledgeMessage(key, groupName, messageId, flags); + return ExecuteAsync(msg, ResultProcessor.Int64); + } + + public long StreamAcknowledge(RedisKey key, RedisValue groupName, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) + { + var msg = GetStreamAcknowledgeMessage(key, groupName, messageIds, flags); + return ExecuteSync(msg, ResultProcessor.Int64); + } + + public Task StreamAcknowledgeAsync(RedisKey key, RedisValue groupName, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) + { + var msg = GetStreamAcknowledgeMessage(key, groupName, messageIds, flags); + return ExecuteAsync(msg, ResultProcessor.Int64); + } + + public RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None) + { + var msg = GetStreamAddMessage(key, + messageId ?? StreamConstants.AutoGeneratedId, + maxLength, + useApproximateMaxLength, + new NameValueEntry(streamField, streamValue), + flags); + + return ExecuteSync(msg, ResultProcessor.RedisValue); + } + + public Task StreamAddAsync(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None) + { + var msg = GetStreamAddMessage(key, + messageId ?? StreamConstants.AutoGeneratedId, + maxLength, + useApproximateMaxLength, + new NameValueEntry(streamField, streamValue), + flags); + + return ExecuteAsync(msg, ResultProcessor.RedisValue); + } + + public RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None) + { + var msg = GetStreamAddMessage(key, + messageId ?? StreamConstants.AutoGeneratedId, + maxLength, + useApproximateMaxLength, + streamPairs, + flags); + + return ExecuteSync(msg, ResultProcessor.RedisValue); + } + + public Task StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None) + { + var msg = GetStreamAddMessage(key, + messageId ?? StreamConstants.AutoGeneratedId, + maxLength, + useApproximateMaxLength, + streamPairs, + flags); + + return ExecuteAsync(msg, ResultProcessor.RedisValue); + } + + public RedisStreamEntry[] StreamClaim(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) + { + var msg = GetStreamClaimMessage(key, + consumerGroup, + claimingConsumer, + minIdleTimeInMs, + messageIds, + returnJustIds: false, + flags: flags); + + return ExecuteSync(msg, ResultProcessor.SingleStream); + } + + public Task StreamClaimAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) + { + var msg = GetStreamClaimMessage(key, + consumerGroup, + claimingConsumer, + minIdleTimeInMs, + messageIds, + returnJustIds: false, + flags: flags); + + return ExecuteAsync(msg, ResultProcessor.SingleStream); + } + + public RedisValue[] StreamClaimIdsOnly(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) + { + var msg = GetStreamClaimMessage(key, + consumerGroup, + claimingConsumer, + minIdleTimeInMs, + messageIds, + returnJustIds: true, + flags: flags); + + return ExecuteSync(msg, ResultProcessor.RedisValueArray); + } + + public Task StreamClaimIdsOnlyAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) + { + var msg = GetStreamClaimMessage(key, + consumerGroup, + claimingConsumer, + minIdleTimeInMs, + messageIds, + returnJustIds: true, + flags: flags); + + return ExecuteAsync(msg, ResultProcessor.RedisValueArray); + } + + public bool StreamCreateConsumerGroup(RedisKey key, RedisValue groupName, RedisValue? readFrom = null, CommandFlags flags = CommandFlags.None) + { + var msg = Message.Create(Database, + flags, + RedisCommand.XGROUP, + new RedisValue[] + { + StreamConstants.Create, + key.AsRedisValue(), + groupName, + readFrom ?? StreamConstants.NewMessages + }); + + return ExecuteSync(msg, ResultProcessor.Boolean); + } + + public Task StreamCreateConsumerGroupAsync(RedisKey key, RedisValue groupName, RedisValue? readFrom = null, CommandFlags flags = CommandFlags.None) + { + var msg = Message.Create(Database, + flags, + RedisCommand.XGROUP, + new RedisValue[] + { + StreamConstants.Create, + key.AsRedisValue(), + groupName, + readFrom ?? StreamConstants.NewMessages + }); + + return ExecuteAsync(msg, ResultProcessor.Boolean); + } + + public StreamConsumerInfo[] StreamConsumerInfo(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None) + { + var msg = Message.Create(Database, + flags, + RedisCommand.XINFO, + new RedisValue[] + { + StreamConstants.Consumers, + key.AsRedisValue(), + groupName + }); + + return ExecuteSync(msg, ResultProcessor.StreamConsumerInfo); + } + + public Task StreamConsumerInfoAsync(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None) + { + var msg = Message.Create(Database, + flags, + RedisCommand.XINFO, + new RedisValue[] + { + StreamConstants.Consumers, + key.AsRedisValue(), + groupName + }); + + return ExecuteAsync(msg, ResultProcessor.StreamConsumerInfo); + } + + public StreamGroupInfo[] StreamGroupInfo(RedisKey key, CommandFlags flags = CommandFlags.None) + { + var msg = Message.Create(Database, flags, RedisCommand.XINFO, StreamConstants.Groups, key); + return ExecuteSync(msg, ResultProcessor.StreamGroupInfo); + } + + public Task StreamGroupInfoAsync(RedisKey key, CommandFlags flags = CommandFlags.None) + { + var msg = Message.Create(Database, flags, RedisCommand.XINFO, StreamConstants.Groups, key); + return ExecuteAsync(msg, ResultProcessor.StreamGroupInfo); + } + + public StreamInfo StreamInfo(RedisKey key, CommandFlags flags = CommandFlags.None) + { + var msg = Message.Create(Database, flags, RedisCommand.XINFO, StreamConstants.Stream, key); + return ExecuteSync(msg, ResultProcessor.StreamInfo); + } + + public Task StreamInfoAsync(RedisKey key, CommandFlags flags = CommandFlags.None) + { + var msg = Message.Create(Database, flags, RedisCommand.XINFO, StreamConstants.Stream, key); + return ExecuteAsync(msg, ResultProcessor.StreamInfo); + } + + public long StreamLength(RedisKey key, CommandFlags flags = CommandFlags.None) + { + var msg = Message.Create(Database, flags, RedisCommand.XLEN, key); + return ExecuteSync(msg, ResultProcessor.Int64); + } + + public Task StreamLengthAsync(RedisKey key, CommandFlags flags = CommandFlags.None) + { + var msg = Message.Create(Database, flags, RedisCommand.XLEN, key); + return ExecuteAsync(msg, ResultProcessor.Int64); + } + + public long StreamDelete(RedisKey key, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) + { + var msg = Message.Create(Database, + flags, + RedisCommand.XDEL, + key, + messageIds); + + return ExecuteSync(msg, ResultProcessor.Int64); + } + + public Task StreamDeleteAsync(RedisKey key, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) + { + var msg = Message.Create(Database, + flags, + RedisCommand.XDEL, + key, + messageIds); + + return ExecuteAsync(msg, ResultProcessor.Int64); + } + + public StreamPendingInfo StreamPending(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None) + { + var msg = Message.Create(Database, flags, RedisCommand.XPENDING, key, groupName); + return ExecuteSync(msg, ResultProcessor.StreamPendingInfo); + } + + public Task StreamPendingAsync(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None) + { + var msg = Message.Create(Database, flags, RedisCommand.XPENDING, key, groupName); + return ExecuteAsync(msg, ResultProcessor.StreamPendingInfo); + } + + public StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None) + { + var msg = GetStreamPendingMessagesMessage(key, groupName, minId, maxId, count, consumerName, flags); + return ExecuteSync(msg, ResultProcessor.StreamPendingMessages); + } + + public Task StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None) + { + var msg = GetStreamPendingMessagesMessage(key, groupName, minId, maxId, count, consumerName, flags); + return ExecuteAsync(msg, ResultProcessor.StreamPendingMessages); + } + + public RedisStreamEntry[] StreamRange(RedisKey key, RedisValue? minId = null, RedisValue? maxId = null, int? count = null, Order messageOrder = Order.Ascending, CommandFlags flags = CommandFlags.None) + { + var msg = GetStreamRangeMessage(key, minId, maxId, count, messageOrder, flags); + return ExecuteSync(msg, ResultProcessor.SingleStream); + } + + public Task StreamRangeAsync(RedisKey key, RedisValue? minId = null, RedisValue? maxId = null, int? count = null, Order messageOrder = Order.Ascending, CommandFlags flags = CommandFlags.None) + { + var msg = GetStreamRangeMessage(key, minId, maxId, count, messageOrder, flags); + return ExecuteAsync(msg, ResultProcessor.SingleStream); + } + + public RedisStreamEntry[] StreamRead(RedisKey key, RedisValue afterId, int? count = null, CommandFlags flags = CommandFlags.None) + { + var msg = GetSingleStreamReadMessage(key, afterId, count, flags); + return ExecuteSync(msg, ResultProcessor.SingleStreamWithNameSkip); + } + + public Task StreamReadAsync(RedisKey key, RedisValue afterId, int? count = null, CommandFlags flags = CommandFlags.None) + { + var msg = GetSingleStreamReadMessage(key, afterId, count, flags); + return ExecuteAsync(msg, ResultProcessor.SingleStreamWithNameSkip); + } + + public RedisStream[] StreamRead(StreamIdPair[] streamIdPairs, int? countPerStream = null, CommandFlags flags = CommandFlags.None) + { + var msg = GetMultiStreamReadMessage(streamIdPairs, countPerStream, flags); + return ExecuteSync(msg, ResultProcessor.MultiStream); + } + + public Task StreamReadAsync(StreamIdPair[] streamIdPairs, int? countPerStream = null, CommandFlags flags = CommandFlags.None) + { + var msg = GetMultiStreamReadMessage(streamIdPairs, countPerStream, flags); + return ExecuteAsync(msg, ResultProcessor.MultiStream); + } + + public RedisStreamEntry[] StreamReadGroup(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? readFromId = null, int? count = null, CommandFlags flags = CommandFlags.None) + { + var msg = GetStreamReadGroupMessage(key, groupName, consumerName, readFromId, count, flags); + return ExecuteSync(msg, ResultProcessor.SingleStreamWithNameSkip); + } + + public Task StreamReadGroupAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? readFromId = null, int? count = null, CommandFlags flags = CommandFlags.None) + { + var msg = GetStreamReadGroupMessage(key, groupName, consumerName, readFromId, count, flags); + return ExecuteAsync(msg, ResultProcessor.SingleStreamWithNameSkip); + } + + public long StreamTrim(RedisKey key, int maxLength, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None) + { + var msg = GetStreamTrimMessage(key, maxLength, useApproximateMaxLength, flags); + return ExecuteSync(msg, ResultProcessor.Int64); + } + + public Task StreamTrimAsync(RedisKey key, int maxLength, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None) + { + var msg = GetStreamTrimMessage(key, maxLength, useApproximateMaxLength, flags); + return ExecuteAsync(msg, ResultProcessor.Int64); + } + public long StringAppend(RedisKey key, RedisValue value, CommandFlags flags = CommandFlags.None) { var msg = Message.Create(Database, flags, RedisCommand.APPEND, key, value); @@ -1936,6 +2264,62 @@ private RedisValue GetLexRange(RedisValue value, Exclude exclude, bool isStart) return result; } + private Message GetMultiStreamReadMessage(StreamIdPair[] streamIdPairs, int? countPerStream, CommandFlags flags) + { + // Example: XREAD COUNT 2 STREAMS mystream writers 0-0 0-0 + + if (streamIdPairs == null) throw new ArgumentNullException(nameof(streamIdPairs)); + if (streamIdPairs.Length == 0) throw new ArgumentOutOfRangeException(nameof(streamIdPairs), "streamAndIdPairs must contain at least one item."); + + if (countPerStream.HasValue && countPerStream <= 0) + { + throw new ArgumentOutOfRangeException(nameof(countPerStream), "countPerStream must be greater than 0."); + } + + var values = new RedisValue[ + 1 // Streams keyword. + + (streamIdPairs.Length * 2) // Room for the stream names and the ID from which to begin reading. + + (countPerStream.HasValue ? 2 : 0)]; // Room for "COUNT num" or 0 if countPerStream is null. + + var offset = 0; + + if (countPerStream.HasValue) + { + values[offset++] = StreamConstants.Count; + values[offset++] = countPerStream; + } + + values[offset++] = StreamConstants.Streams; + + // Write the stream names and the message IDs from which to read for the associated stream. Each pair + // will be separated by an offset of the index of the stream name plus the pair count. + + /* + * [0] = COUNT + * [1] = 2 + * [3] = STREAMS + * [4] = stream1 + * [5] = stream2 + * [6] = stream3 + * [7] = id1 + * [8] = id2 + * [9] = id3 + * + * */ + + var pairCount = streamIdPairs.Length; + + for (var i = 0; i < pairCount; i++) + { + values[offset] = streamIdPairs[i].Key.AsRedisValue(); + values[offset + pairCount] = streamIdPairs[i].Id; + + offset++; + } + + return Message.Create(Database, flags, RedisCommand.XREAD, values); + } + private RedisValue GetRange(double value, Exclude exclude, bool isStart) { if (isStart) @@ -2164,6 +2548,302 @@ private Message GetSortedSetRemoveRangeByScoreMessage(RedisKey key, double start GetRange(start, exclude, true), GetRange(stop, exclude, false)); } + private Message GetStreamAcknowledgeMessage(RedisKey key, RedisValue groupName, RedisValue messageId, CommandFlags flags) + { + var values = new RedisValue[] + { + key.AsRedisValue(), + groupName, + messageId + }; + + return Message.Create(Database, flags, RedisCommand.XACK, values); + } + + private Message GetStreamAcknowledgeMessage(RedisKey key, RedisValue groupName, RedisValue[] messageIds, CommandFlags flags) + { + if (messageIds == null) throw new ArgumentNullException(nameof(messageIds)); + if (messageIds.Length == 0) throw new ArgumentOutOfRangeException(nameof(messageIds), "messageIds must contain at least one item."); + + var values = new RedisValue[messageIds.Length + 2]; + + var offset = 0; + + values[offset++] = key.AsRedisValue(); + values[offset++] = groupName; + + for (var i = 0; i < messageIds.Length; i++) + { + values[offset++] = messageIds[i]; + } + + return Message.Create(Database, flags, RedisCommand.XACK, values); + } + + private Message GetStreamAddMessage(RedisKey key, RedisValue messageId, int? maxLength, bool useApproximateMaxLength, NameValueEntry streamPair, CommandFlags flags) + { + // Calculate the correct number of arguments: + // 3 array elements for Entry ID & NameValueEntry.Name & NameValueEntry.Value. + // 2 elements if using MAXLEN (keyword & value), otherwise 0. + // 1 element if using Approximate Length (~), otherwise 0. + var totalLength = 3 + (maxLength.HasValue ? 2 : 0) + + (maxLength.HasValue && useApproximateMaxLength ? 1 : 0); + + var values = new RedisValue[totalLength]; + var offset = 0; + + values[offset++] = messageId; + + if (maxLength.HasValue) + { + values[offset++] = StreamConstants.MaxLen; + + if (useApproximateMaxLength) + { + values[offset++] = StreamConstants.ApproximateMaxLen; + values[offset++] = maxLength.Value; + } + else + { + values[offset++] = maxLength.Value; + } + } + + values[offset++] = streamPair.Name; + values[offset] = streamPair.Value; + + return Message.Create(Database, flags, RedisCommand.XADD, key, values); + } + + private Message GetStreamAddMessage(RedisKey key, RedisValue entryId, int? maxLength, bool useApproximateMaxLength, NameValueEntry[] streamPairs, CommandFlags flags) + { + // See https://redis.io/commands/xadd. + + if (streamPairs == null) throw new ArgumentNullException(nameof(streamPairs)); + if (streamPairs.Length == 0) throw new ArgumentOutOfRangeException(nameof(streamPairs), "streamPairs must contain at least one item."); + + if (maxLength.HasValue && maxLength <= 0) + { + throw new ArgumentOutOfRangeException(nameof(maxLength), "maxLength must be greater than 0."); + } + + var includeMaxLen = maxLength.HasValue ? 2 : 0; + var includeApproxLen = maxLength.HasValue && useApproximateMaxLength ? 1 : 0; + + var totalLength = (streamPairs.Length * 2) // Room for the name/value pairs + + 1 // The stream entry ID + + includeMaxLen // 2 or 0 (MAXLEN keyword & the count) + + includeApproxLen; // 1 or 0 + + var values = new RedisValue[totalLength]; + + var offset = 0; + + values[offset++] = entryId; + + if (maxLength.HasValue) + { + values[offset++] = StreamConstants.MaxLen; + + if (useApproximateMaxLength) + { + values[offset++] = StreamConstants.ApproximateMaxLen; + } + + values[offset++] = maxLength.Value; + } + + for (var i = 0; i < streamPairs.Length; i++) + { + values[offset++] = streamPairs[i].Name; + values[offset++] = streamPairs[i].Value; + } + + return Message.Create(Database, flags, RedisCommand.XADD, key, values); + } + + private Message GetStreamClaimMessage(RedisKey key, RedisValue consumerGroup, RedisValue assignToConsumer, long minIdleTimeInMs, RedisValue[] messageIds, bool returnJustIds, CommandFlags flags) + { + if (messageIds == null) throw new ArgumentNullException(nameof(messageIds)); + if (messageIds.Length == 0) throw new ArgumentOutOfRangeException(nameof(messageIds), "messageIds must contain at least one item."); + + // XCLAIM ... + var values = new RedisValue[4 + messageIds.Length + (returnJustIds ? 1 : 0)]; + + var offset = 0; + + values[offset++] = key.AsRedisValue(); + values[offset++] = consumerGroup; + values[offset++] = assignToConsumer; + values[offset++] = minIdleTimeInMs; + + for (var i = 0; i < messageIds.Length; i++) + { + values[offset++] = messageIds[i]; + } + + if (returnJustIds) + { + values[offset] = StreamConstants.JustId; + } + + return Message.Create(Database, flags, RedisCommand.XCLAIM, values); + } + + private Message GetStreamPendingMessagesMessage(RedisKey key, RedisValue groupName, RedisValue? minId, RedisValue? maxId, int count, RedisValue consumerName, CommandFlags flags) + { + // > XPENDING mystream mygroup - + 10 [consumer name] + // 1) 1) 1526569498055 - 0 + // 2) "Bob" + // 3) (integer)74170458 + // 4) (integer)1 + // 2) 1) 1526569506935 - 0 + // 2) "Bob" + // 3) (integer)74170458 + // 4) (integer)1 + + // See https://redis.io/topics/streams-intro. + + if (count <= 0) + { + throw new ArgumentOutOfRangeException(nameof(count), "count must be greater than 0."); + } + + var values = new RedisValue[consumerName == RedisValue.Null ? 5 : 6]; + + values[0] = key.AsRedisValue(); + values[1] = groupName; + values[2] = minId ?? StreamConstants.ReadMinValue; + values[3] = maxId ?? StreamConstants.ReadMaxValue; + values[4] = count; + + if (consumerName != RedisValue.Null) + { + values[5] = consumerName; + } + + return Message.Create(Database, + flags, + RedisCommand.XPENDING, + values); + } + + private Message GetStreamRangeMessage(RedisKey key, RedisValue? minId, RedisValue? maxId, int? count, Order messageOrder, CommandFlags flags) + { + if (count.HasValue && count <= 0) + { + throw new ArgumentOutOfRangeException(nameof(count), "count must be greater than 0."); + } + + var actualMin = minId ?? StreamConstants.ReadMinValue; + var actualMax = maxId ?? StreamConstants.ReadMaxValue; + + var values = new RedisValue[2 + (count.HasValue ? 2 : 0)]; + + values[0] = (messageOrder == Order.Ascending ? actualMin : actualMax); + values[1] = (messageOrder == Order.Ascending ? actualMax : actualMin); + + if (count.HasValue) + { + values[2] = StreamConstants.Count; + values[3] = count.Value; + } + + return Message.Create(Database, + flags, + messageOrder == Order.Ascending ? RedisCommand.XRANGE : RedisCommand.XREVRANGE, + key, + values); + } + + private Message GetStreamReadGroupMessage(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? readFromId, int? count, CommandFlags flags) + { + // Example: > XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream > + if (count.HasValue && count <= 0) + { + throw new ArgumentOutOfRangeException(nameof(count), "count must be greater than 0."); + } + + var totalValueCount = 6 + (count.HasValue ? 2 : 0); + var values = new RedisValue[totalValueCount]; + + var offset = 0; + + values[offset++] = StreamConstants.Group; + values[offset++] = groupName; + values[offset++] = consumerName; + + if (count.HasValue) + { + values[offset++] = StreamConstants.Count; + values[offset++] = count.Value; + } + + values[offset++] = StreamConstants.Streams; + values[offset++] = key.AsRedisValue(); + values[offset] = readFromId ?? StreamConstants.UndeliveredMessages; + + return Message.Create(Database, + flags, + RedisCommand.XREADGROUP, + values); + } + + private Message GetSingleStreamReadMessage(RedisKey key, RedisValue afterId, int? count, CommandFlags flags) + { + if (count.HasValue && count <= 0) + { + throw new ArgumentOutOfRangeException(nameof(count), "count must be greater than 0."); + } + + var values = new RedisValue[3 + (count.HasValue ? 2 : 0)]; + var offset = 0; + + if (count.HasValue) + { + values[offset++] = StreamConstants.Count; + values[offset++] = count.Value; + } + + values[offset++] = StreamConstants.Streams; + values[offset++] = key.AsRedisValue(); + values[offset] = afterId; + + // Example: > XREAD COUNT 2 STREAMS writers 1526999352406-0 + return Message.Create(Database, + flags, + RedisCommand.XREAD, + values); + } + + private Message GetStreamTrimMessage(RedisKey key, int maxLength, bool useApproximateMaxLength, CommandFlags flags) + { + if (maxLength <= 0) + { + throw new ArgumentOutOfRangeException(nameof(maxLength), "maxLength must be greater than 0."); + } + + var values = new RedisValue[2 + (useApproximateMaxLength ? 1 : 0)]; + + values[0] = StreamConstants.MaxLen; + + if (useApproximateMaxLength) + { + values[1] = StreamConstants.ApproximateMaxLen; + values[2] = maxLength; + } + else + { + values[1] = maxLength; + } + + return Message.Create(Database, + flags, + RedisCommand.XTRIM, + key, + values); + } + private Message GetStringBitOperationMessage(Bitwise operation, RedisKey destination, RedisKey[] keys, CommandFlags flags) { if (keys == null) throw new ArgumentNullException(nameof(keys)); diff --git a/StackExchange.Redis/StackExchange/Redis/RedisFeatures.cs b/StackExchange.Redis/StackExchange/Redis/RedisFeatures.cs index d0f32a6a3..9dd4469fc 100644 --- a/StackExchange.Redis/StackExchange/Redis/RedisFeatures.cs +++ b/StackExchange.Redis/StackExchange/Redis/RedisFeatures.cs @@ -28,7 +28,8 @@ public struct RedisFeatures v2_8_18 = new Version(2, 8, 18), v2_9_5 = new Version(2, 9, 5), v3_0_0 = new Version(3, 0, 0), - v3_2_0 = new Version(3, 2, 0); + v3_2_0 = new Version(3, 2, 0), + v4_9_1 = new Version(4, 9, 1); // 5.0 RC1 is version 4.9.1 private readonly Version version; /// @@ -120,6 +121,11 @@ public RedisFeatures(Version version) /// public bool SetVaradicAddRemove => Version >= v2_4_0; + /// + /// Are Redis Streams available? + /// + public bool Streams => Version >= v4_9_1; + /// /// Is STRLEN available? /// diff --git a/StackExchange.Redis/StackExchange/Redis/RedisStream.cs b/StackExchange.Redis/StackExchange/Redis/RedisStream.cs new file mode 100644 index 000000000..0383f4ee1 --- /dev/null +++ b/StackExchange.Redis/StackExchange/Redis/RedisStream.cs @@ -0,0 +1,24 @@ +namespace StackExchange.Redis +{ + /// + /// Describes a Redis Stream with an associated array of entries. + /// + public struct RedisStream + { + internal RedisStream(RedisKey key, RedisStreamEntry[] entries) + { + Key = key; + Entries = entries; + } + + /// + /// The key for the stream. + /// + public RedisKey Key { get; } + + /// + /// An arry of entries contained within the stream. + /// + public RedisStreamEntry[] Entries { get; } + } +} diff --git a/StackExchange.Redis/StackExchange/Redis/RedisStreamEntry.cs b/StackExchange.Redis/StackExchange/Redis/RedisStreamEntry.cs new file mode 100644 index 000000000..592a9508d --- /dev/null +++ b/StackExchange.Redis/StackExchange/Redis/RedisStreamEntry.cs @@ -0,0 +1,34 @@ +namespace StackExchange.Redis +{ + /// + /// Describes an entry contained in a Redis Stream. + /// + public struct RedisStreamEntry + { + internal RedisStreamEntry(RedisValue id, NameValueEntry[] values) + { + Id = id; + Values = values; + } + + /// + /// A null stream entry. + /// + public static RedisStreamEntry Null { get; } = new RedisStreamEntry(RedisValue.Null, null); + + /// + /// The ID assigned to the message. + /// + public RedisValue Id { get; } + + /// + /// The values contained within the message. + /// + public NameValueEntry[] Values { get; } + + /// + /// Indicates that the Redis Stream Entry is null. + /// + public bool IsNull => Id == RedisValue.Null && Values == null; + } +} diff --git a/StackExchange.Redis/StackExchange/Redis/ResultProcessor.cs b/StackExchange.Redis/StackExchange/Redis/ResultProcessor.cs index afc1cf3da..df126b39c 100644 --- a/StackExchange.Redis/StackExchange/Redis/ResultProcessor.cs +++ b/StackExchange.Redis/StackExchange/Redis/ResultProcessor.cs @@ -40,6 +40,9 @@ public static readonly ResultProcessor public static readonly ResultProcessor>[]> Info = new InfoProcessor(); + public static readonly MultiStreamProcessor + MultiStream = new MultiStreamProcessor(); + public static readonly ResultProcessor Int64 = new Int64Processor(), PubSubNumSub = new PubSubNumSubProcessor(); @@ -84,6 +87,27 @@ public static readonly ResultProcessor public static readonly SortedSetEntryArrayProcessor SortedSetWithScores = new SortedSetEntryArrayProcessor(); + public static readonly SingleStreamProcessor + SingleStream = new SingleStreamProcessor(); + + public static readonly SingleStreamProcessor + SingleStreamWithNameSkip = new SingleStreamProcessor(skipStreamName: true); + + public static readonly StreamConsumerInfoProcessor + StreamConsumerInfo = new StreamConsumerInfoProcessor(); + + public static readonly StreamGroupInfoProcessor + StreamGroupInfo = new StreamGroupInfoProcessor(); + + public static readonly StreamInfoProcessor + StreamInfo = new StreamInfoProcessor(); + + public static readonly StreamPendingInfoProcessor + StreamPendingInfo = new StreamPendingInfoProcessor(); + + public static readonly StreamPendingMessagesProcessor + StreamPendingMessages = new StreamPendingMessagesProcessor(); + public static ResultProcessor GeoRadiusArray(GeoRadiusOptions options) => GeoRadiusResultArrayProcessor.Get(options); public static readonly ResultProcessor @@ -1301,6 +1325,424 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes } } + internal sealed class SingleStreamProcessor : StreamProcessorBase + { + private bool skipStreamName; + + public SingleStreamProcessor(bool skipStreamName = false) + { + this.skipStreamName = skipStreamName; + } + + protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result) + { + if (result.IsNull) + { + // Server returns 'nil' if no entries are returned for the given stream. + SetResult(message, new RedisStreamEntry[0]); + return true; + } + + if (result.Type != ResultType.MultiBulk) + { + return false; + } + + RedisStreamEntry[] entries = null; + + if (skipStreamName) + { + // Skip the first element in the array (i.e., the stream name). + // See https://redis.io/commands/xread. + + // > XREAD COUNT 2 STREAMS mystream 0 + // 1) 1) "mystream" <== Skip the stream name + // 2) 1) 1) 1519073278252 - 0 <== Index 1 contains the array of stream entries + // 2) 1) "foo" + // 2) "value_1" + // 2) 1) 1519073279157 - 0 + // 2) 1) "foo" + // 2) "value_2" + + // Retrieve the initial array. For XREAD of a single stream it will + // be an array of only 1 element in the response. + var readResult = result.GetItems(); + + // Within that single element, GetItems will return an array of + // 2 elements: the stream name and the stream entries. + // Skip the stream name (index 0) and only process the stream entries (index 1). + entries = ParseRedisStreamEntries(readResult[0].GetItems()[1]); + } + else + { + entries = ParseRedisStreamEntries(result); + } + + SetResult(message, entries); + return true; + } + } + + internal sealed class MultiStreamProcessor : StreamProcessorBase + { + /* + The result is similar to the XRANGE result (see SingleStreamProcessor) + with the addition of the stream name as the first element of top level + Multibulk array. + + See https://redis.io/commands/xread. + + > XREAD COUNT 2 STREAMS mystream writers 0-0 0-0 + 1) 1) "mystream" + 2) 1) 1) 1526984818136-0 + 2) 1) "duration" + 2) "1532" + 3) "event-id" + 4) "5" + 2) 1) 1526999352406-0 + 2) 1) "duration" + 2) "812" + 3) "event-id" + 4) "9" + 2) 1) "writers" + 2) 1) 1) 1526985676425-0 + 2) 1) "name" + 2) "Virginia" + 3) "surname" + 4) "Woolf" + 2) 1) 1526985685298-0 + 2) 1) "name" + 2) "Jane" + 3) "surname" + 4) "Austen" + */ + + protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result) + { + if (result.IsNull) + { + // Nothing returned for any of the requested streams. The server returns 'nil'. + SetResult(message, new RedisStream[0]); + return true; + } + + if (result.Type != ResultType.MultiBulk) + { + return false; + } + + var arr = result.GetItems(); + + var streams = Array.ConvertAll(arr, item => + { + var details = item.GetItems(); + + // details[0] = Name of the Stream + // details[1] = Multibulk Array of Stream Entries + + return new RedisStream(key: details[0].AsRedisKey(), + entries: ParseRedisStreamEntries(details[1])); + }); + + SetResult(message, streams); + return true; + } + } + + internal sealed class StreamConsumerInfoProcessor : InterleavedStreamInfoProcessorBase + { + protected override StreamConsumerInfo ParseItem(RawResult result) + { + // Note: the base class passes a single consumer from the response into this method. + + // Response format: + // > XINFO CONSUMERS mystream mygroup + // 1) 1) name + // 2) "Alice" + // 3) pending + // 4) (integer)1 + // 5) idle + // 6) (integer)9104628 + // 2) 1) name + // 2) "Bob" + // 3) pending + // 4) (integer)1 + // 5) idle + // 6) (integer)83841983 + + var arr = result.GetItems(); + + return new StreamConsumerInfo(name: arr[1].AsRedisValue(), + pendingMessageCount: (int)arr[3].AsRedisValue(), + idleTimeInMilliseconds: (long)arr[5].AsRedisValue()); + } + } + + internal sealed class StreamGroupInfoProcessor : InterleavedStreamInfoProcessorBase + { + protected override StreamGroupInfo ParseItem(RawResult result) + { + // Note: the base class passes a single item from the response into this method. + + // Response format: + // > XINFO GROUPS mystream + // 1) 1) name + // 2) "mygroup" + // 3) consumers + // 4) (integer)2 + // 5) pending + // 6) (integer)2 + // 2) 1) name + // 2) "some-other-group" + // 3) consumers + // 4) (integer)1 + // 5) pending + // 6) (integer)0 + + var arr = result.GetItems(); + + return new StreamGroupInfo(name: arr[1].AsRedisValue(), + consumerCount: (int)arr[3].AsRedisValue(), + pendingMessageCount: (int)arr[5].AsRedisValue()); + } + } + + internal abstract class InterleavedStreamInfoProcessorBase : ResultProcessor + { + protected abstract T ParseItem(RawResult result); + + protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result) + { + if (result.Type != ResultType.MultiBulk) + { + return false; + } + + var arr = result.GetItems(); + var parsedItems = Array.ConvertAll(arr, item => ParseItem(item)); + + SetResult(message, parsedItems); + return true; + } + } + + internal sealed class StreamInfoProcessor : StreamProcessorBase + { + // Parse the following format: + // > XINFO mystream + // 1) length + // 2) (integer) 13 + // 3) radix-tree-keys + // 4) (integer) 1 + // 5) radix-tree-nodes + // 6) (integer) 2 + // 7) groups + // 8) (integer) 2 + // 9) first-entry + // 10) 1) 1524494395530-0 + // 2) 1) "a" + // 2) "1" + // 3) "b" + // 4) "2" + // 11) last-entry + // 12) 1) 1526569544280-0 + // 2) 1) "message" + // 2) "banana" + protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result) + { + if (result.Type != ResultType.MultiBulk) + { + return false; + } + + var arr = result.GetItems(); + + if (arr.Length != 12) + { + return false; + } + + // Note: Even if there is only 1 message in the stream, this command returns + // the single entry as the first-entry and last-entry in the response. + + // The first 8 items are interleaved name/value pairs. + // Items 9-12 represent the first and last entry in the stream. The values will + // be nil (stored in index 9 & 11) if the stream length is 0. + + var entries = ParseRedisStreamEntries(RawResult.CreateMultiBulk(arr[9], arr[11])); + + var streamInfo = new StreamInfo(length: (int)arr[1].AsRedisValue(), + radixTreeKeys: (int)arr[3].AsRedisValue(), + radixTreeNodes: (int)arr[5].AsRedisValue(), + groups: (int)arr[7].AsRedisValue(), + firstEntry: entries[0], + lastEntry: entries[1]); + + SetResult(message, streamInfo); + return true; + } + } + + internal sealed class StreamPendingInfoProcessor : ResultProcessor + { + protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result) + { + // Example: + // > XPENDING mystream mygroup + // 1) (integer)2 + // 2) 1526569498055 - 0 + // 3) 1526569506935 - 0 + // 4) 1) 1) "Bob" + // 2) "2" + // 5) 1) 1) "Joe" + // 2) "8" + + if (result.Type != ResultType.MultiBulk) + { + return false; + } + + var arr = result.GetItems(); + + if (arr.Length != 4) + { + return false; + } + + StreamConsumer[] consumers = null; + + // If there are no consumers as of yet for the given group, the last + // item in the response array will be null. + if (!arr[3].IsNull) + { + consumers = Array.ConvertAll(arr[3].GetItems(), item => + { + var details = item.GetItems(); + + return new StreamConsumer( + name: details[0].AsRedisValue(), + pendingMessageCount: (int)details[1].AsRedisValue()); + }); + } + + var pendingInfo = new StreamPendingInfo(pendingMessageCount: (int)arr[0].AsRedisValue(), + lowestId: arr[1].AsRedisValue(), + highestId: arr[2].AsRedisValue(), + consumers: consumers ?? new StreamConsumer[0]); + // ^^^^^ + // Should we bother allocating an empty array only to prevent the need for a null check? + + SetResult(message, pendingInfo); + return true; + } + } + + internal sealed class StreamPendingMessagesProcessor : ResultProcessor + { + protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result) + { + if (result.Type != ResultType.MultiBulk) + { + return false; + } + + var arr = result.GetItems(); + + var messageInfoArray = Array.ConvertAll(arr, item => + { + var details = item.GetItems(); + + return new StreamPendingMessageInfo(messageId: details[0].AsRedisValue(), + consumerName: details[1].AsRedisValue(), + idleTimeInMs: (long)details[2].AsRedisValue(), + deliveryCount: (int)details[3].AsRedisValue()); + }); + + SetResult(message, messageInfoArray); + return true; + } + } + + internal abstract class StreamProcessorBase : ResultProcessor + { + // For command response formats see https://redis.io/topics/streams-intro. + + protected RedisStreamEntry[] ParseRedisStreamEntries(RawResult result) + { + if (result.Type != ResultType.MultiBulk) + { + return null; + } + + var arr = result.GetItems(); + + return Array.ConvertAll(arr, item => + { + if (item.IsNull || item.Type != ResultType.MultiBulk) + { + return RedisStreamEntry.Null; + } + + // Process the Multibulk array for each entry. The entry contains the following elements: + // [0] = SimpleString (the ID of the stream entry) + // [1] = Multibulk array of the name/value pairs of the stream entry's data + var entryDetails = item.GetItems(); + + return new RedisStreamEntry(id: entryDetails[0].AsRedisValue(), + values: ParseStreamEntryValues(entryDetails[1])); + }); + } + + protected NameValueEntry[] ParseStreamEntryValues(RawResult result) + { + // The XRANGE, XREVRANGE, XREAD commands return stream entries + // in the following format. The name/value pairs are interleaved + // in the same fashion as the HGETALL response. + // + // 1) 1) 1518951480106-0 + // 2) 1) "sensor-id" + // 2) "1234" + // 3) "temperature" + // 4) "19.8" + // 2) 1) 1518951482479-0 + // 2) 1) "sensor-id" + // 2) "9999" + // 3) "temperature" + // 4) "18.2" + + if (result.Type != ResultType.MultiBulk) + { + return null; + } + + var arr = result.GetItems(); + + if (arr == null) + { + return null; + } + + // Calculate how many name/value pairs are in the stream entry. + int count = arr.Length / 2; + + if (count == 0) + { + return new NameValueEntry[0]; + } + + var pairs = new NameValueEntry[count]; + int offset = 0; + + for (int i = 0; i < pairs.Length; i++) + { + pairs[i] = new NameValueEntry(arr[offset++].AsRedisValue(), + arr[offset++].AsRedisValue()); + } + + return pairs; + } + } + private sealed class StringPairInterleavedProcessor : ValuePairInterleavedProcessorBase> { protected override KeyValuePair Parse(RawResult first, RawResult second) diff --git a/StackExchange.Redis/StackExchange/Redis/StreamConstants.cs b/StackExchange.Redis/StackExchange/Redis/StreamConstants.cs new file mode 100644 index 000000000..1b70ed130 --- /dev/null +++ b/StackExchange.Redis/StackExchange/Redis/StreamConstants.cs @@ -0,0 +1,57 @@ + +namespace StackExchange.Redis +{ + /// + /// Constants representing values used in Redis Stream commands. + /// + internal static class StreamConstants + { + /// + /// The "~" value used with the MAXLEN option. + /// + internal static readonly RedisValue ApproximateMaxLen = "~"; + + /// + /// The "*" value used with the XADD command. + /// + internal static readonly RedisValue AutoGeneratedId = "*"; + + /// + /// The "$" value used in the XGROUP command. Indicates reading only new messages from the stream. + /// + internal static readonly RedisValue NewMessages = "$"; + + /// + /// The "-" value used in the XRANGE, XREAD, and XREADGROUP commands. Indicates the minimum message ID from the stream. + /// + internal static readonly RedisValue ReadMinValue = "-"; + + /// + /// The "+" value used in the XRANGE, XREAD, and XREADGROUP commands. Indicates the maximum message ID from the stream. + /// + internal static readonly RedisValue ReadMaxValue = "+"; + + /// + /// The ">" value used in the XREADGROUP command. Use this to read messages that have not been delivered to a consumer group. + /// + internal static readonly RedisValue UndeliveredMessages = ">"; + + internal static readonly RedisValue Consumers = "CONSUMERS"; + + internal static readonly RedisValue Count = "COUNT"; + + internal static readonly RedisValue Create = "CREATE"; + + internal static readonly RedisValue Group = "GROUP"; + + internal static readonly RedisValue Groups = "GROUPS"; + + internal static readonly RedisValue JustId = "JUSTID"; + + internal static readonly RedisValue MaxLen = "MAXLEN"; + + internal static readonly RedisValue Stream = "STREAM"; + + internal static readonly RedisValue Streams = "STREAMS"; + } +} diff --git a/StackExchange.Redis/StackExchange/Redis/StreamConsumer.cs b/StackExchange.Redis/StackExchange/Redis/StreamConsumer.cs new file mode 100644 index 000000000..f37e4be5b --- /dev/null +++ b/StackExchange.Redis/StackExchange/Redis/StreamConsumer.cs @@ -0,0 +1,25 @@ + +namespace StackExchange.Redis +{ + /// + /// Describes a consumer off a Redis Stream. + /// + public struct StreamConsumer + { + internal StreamConsumer(RedisValue name, int pendingMessageCount) + { + Name = name; + PendingMessageCount = pendingMessageCount; + } + + /// + /// The name of the consumer. + /// + public RedisValue Name { get; } + + /// + /// The number of messages that have been delivered by not yet acknowledged by the consumer. + /// + public int PendingMessageCount { get; } + } +} diff --git a/StackExchange.Redis/StackExchange/Redis/StreamConsumerInfo.cs b/StackExchange.Redis/StackExchange/Redis/StreamConsumerInfo.cs new file mode 100644 index 000000000..d7b6adc63 --- /dev/null +++ b/StackExchange.Redis/StackExchange/Redis/StreamConsumerInfo.cs @@ -0,0 +1,32 @@ + +namespace StackExchange.Redis +{ + /// + /// Describes a consumer within a consumer group, retrieved using the XINFO CONSUMERS command. + /// + public struct StreamConsumerInfo + { + internal StreamConsumerInfo(string name, int pendingMessageCount, long idleTimeInMilliseconds) + { + Name = name; + PendingMessageCount = pendingMessageCount; + IdleTimeInMilliseconds = idleTimeInMilliseconds; + } + + /// + /// The name of the consumer. + /// + public string Name { get; } + + /// + /// The number of pending messages for the consumer. A pending message is one that has been + /// received by the consumer but not yet acknowledged. + /// + public int PendingMessageCount { get; } + + /// + /// The idle time, if any, for the consumer. + /// + public long IdleTimeInMilliseconds { get; } + } +} diff --git a/StackExchange.Redis/StackExchange/Redis/StreamGroupInfo.cs b/StackExchange.Redis/StackExchange/Redis/StreamGroupInfo.cs new file mode 100644 index 000000000..ae120a5e4 --- /dev/null +++ b/StackExchange.Redis/StackExchange/Redis/StreamGroupInfo.cs @@ -0,0 +1,32 @@ + +namespace StackExchange.Redis +{ + /// + /// Describes a consumer group retrieved using the XINFO GROUPS command. + /// + public struct StreamGroupInfo + { + internal StreamGroupInfo(string name, int consumerCount, int pendingMessageCount) + { + Name = name; + ConsumerCount = consumerCount; + PendingMessageCount = pendingMessageCount; + } + + /// + /// The name of the consumer group. + /// + public string Name { get; } + + /// + /// The number of consumers within the consumer group. + /// + public int ConsumerCount { get; } + + /// + /// The total number of pending messages for the consumer group. A pending message is one that has been + /// received by a consumer but not yet acknowledged. + /// + public int PendingMessageCount { get; } + } +} diff --git a/StackExchange.Redis/StackExchange/Redis/StreamIdPair.cs b/StackExchange.Redis/StackExchange/Redis/StreamIdPair.cs new file mode 100644 index 000000000..198396946 --- /dev/null +++ b/StackExchange.Redis/StackExchange/Redis/StreamIdPair.cs @@ -0,0 +1,36 @@ + +namespace StackExchange.Redis +{ + /// + /// Describes a pair consisting of the Stream Key and the ID from which to read. + /// + /// + public struct StreamIdPair + { + /// + /// Initializes a value. + /// + /// The key for the stream. + /// The ID from which to begin reading the stream. + public StreamIdPair(RedisKey key, RedisValue id) + { + Key = key; + Id = id; + } + + /// + /// The key for the stream. + /// + public RedisKey Key { get; } + + /// + /// The ID from which to begin reading the stream. + /// + public RedisValue Id { get; } + + /// + /// See Object.ToString() + /// + public override string ToString() => $"{Key}: {Id}"; + } +} diff --git a/StackExchange.Redis/StackExchange/Redis/StreamInfo.cs b/StackExchange.Redis/StackExchange/Redis/StreamInfo.cs new file mode 100644 index 000000000..3cc2d7337 --- /dev/null +++ b/StackExchange.Redis/StackExchange/Redis/StreamInfo.cs @@ -0,0 +1,54 @@ + +namespace StackExchange.Redis +{ + /// + /// Describes stream information retrieved using the XINFO STREAM command. + /// + public struct StreamInfo + { + internal StreamInfo(int length, + int radixTreeKeys, + int radixTreeNodes, + int groups, + RedisStreamEntry firstEntry, + RedisStreamEntry lastEntry) + { + Length = length; + RadixTreeKeys = radixTreeKeys; + RadixTreeNodes = radixTreeNodes; + ConsumerGroupCount = groups; + FirstEntry = firstEntry; + LastEntry = lastEntry; + } + + /// + /// The number of entries in the stream. + /// + public int Length { get; } + + /// + /// The number of radix tree keys in the stream. + /// + public int RadixTreeKeys { get; } + + /// + /// The number of radix tree nodes in the stream. + /// + public int RadixTreeNodes { get; } + + /// + /// The number of consumers groups in the stream. + /// + public int ConsumerGroupCount { get; } + + /// + /// The first entry in the stream. + /// + public RedisStreamEntry FirstEntry { get; } + + /// + /// The last entry in the stream. + /// + public RedisStreamEntry LastEntry { get; } + } +} diff --git a/StackExchange.Redis/StackExchange/Redis/StreamPendingInfo.cs b/StackExchange.Redis/StackExchange/Redis/StreamPendingInfo.cs new file mode 100644 index 000000000..61819b910 --- /dev/null +++ b/StackExchange.Redis/StackExchange/Redis/StreamPendingInfo.cs @@ -0,0 +1,41 @@ + +namespace StackExchange.Redis +{ + /// + /// Describes basic information about pending messages for a consumer group. + /// + public struct StreamPendingInfo + { + internal StreamPendingInfo(int pendingMessageCount, + RedisValue lowestId, + RedisValue highestId, + StreamConsumer[] consumers) + { + PendingMessageCount = pendingMessageCount; + LowestPendingMessageId = lowestId; + HighestPendingMessageId = highestId; + Consumers = consumers; + } + + /// + /// The number of pending messages. A pending message is a message that has been consumed but not yet acknowledged. + /// + public int PendingMessageCount { get; } + + /// + /// The lowest message ID in the set of pending messages. + /// + public RedisValue LowestPendingMessageId { get; } + + /// + /// The highest message ID in the set of pending messages. + /// + public RedisValue HighestPendingMessageId { get; } + + /// + /// An array of consumers within the consumer group that have pending messages. + /// + public StreamConsumer[] Consumers { get; } + + } +} diff --git a/StackExchange.Redis/StackExchange/Redis/StreamPendingMessageInfo.cs b/StackExchange.Redis/StackExchange/Redis/StreamPendingMessageInfo.cs new file mode 100644 index 000000000..180ad6326 --- /dev/null +++ b/StackExchange.Redis/StackExchange/Redis/StreamPendingMessageInfo.cs @@ -0,0 +1,41 @@ + +namespace StackExchange.Redis +{ + /// + /// Describes properties of a pending message. A pending message is one that has + /// been received by a consumer but has not yet been acknowledged. + /// + public struct StreamPendingMessageInfo + { + internal StreamPendingMessageInfo(RedisValue messageId, + RedisValue consumerName, + long idleTimeInMs, + int deliveryCount) + { + MessageId = messageId; + ConsumerName = consumerName; + IdleTimeInMilliseconds = idleTimeInMs; + DeliveryCount = deliveryCount; + } + + /// + /// The ID of the pending message. + /// + public RedisValue MessageId { get; } + + /// + /// The consumer that received the pending message. + /// + public RedisValue ConsumerName { get; } + + /// + /// The time that has passed since the message was last delivered to a consumer. + /// + public long IdleTimeInMilliseconds { get; } + + /// + /// The number of times the message has been delivered to a consumer. + /// + public int DeliveryCount { get; } + } +}