Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial implementation of Redis Streams. #860

Merged
merged 15 commits into from
Jul 4, 2018

Conversation

ttingen
Copy link
Contributor

@ttingen ttingen commented Jun 17, 2018

The Streams data type is available in release 5.0 RC1 and above.

  • Implemented Sync & Async methods for all Stream related commands (minus the blocking options) as of 5.0 RC1.
  • Added tests for the synchronous versions of the Streams API but the testing is a work in progress. Need to refactor for reuse within the streams tests and write a more thorough suite of tests.
  • Added a NameValueEntry struct which mimicks HashEntry. Using HashEntry for the name/value pairs of stream entries seemed wrong. Perhaps refactor the usage of HashEntry to the more generic NameValueEntry and deprecate HashEntry?

@mgravell
Copy link
Collaborator

mgravell commented Jun 17, 2018 via email

@NickCraver
Copy link
Collaborator

Lots of reading to do indeed! I just went through a quick cursory scan of this and I wanted to say: this is fantastic.

It's the largest first time contribution I've seen to any project and also appears to be very well polished. Thank you. Marc and I were already talking about adding this and the other APIs we haven't caught up with (including overloads) in v2 that we're working on now. This is perfect timing too. Marc's busy on the pipelines branch at the moment but we'll give this a shakedown and get it in ASAP. The test suite helps immensely here. Again: very well done. I'm impressed.

@ttingen
Copy link
Contributor Author

ttingen commented Jun 17, 2018

@mgravell Yes, I'm willling to contribute this PR under the project's license.

Thanks for the positive feedback, streams is a feature that I look forward to using.

I already see some things that I missed in my hasty review last night at midnight. Copypasta got me on some of the unit test comments and the XML documentation on the NameValueEntry struct still references HashEntry, those things need to be cleaned up (I have since cleaned these up).

I also didn't like the naming of some of the API methods and parameters (e.g., StreamClaimMessagesReturningIds, and some others). The names are sufficiently descriptive but I think they could be improved. Naming is hard...

@NickCraver
Copy link
Collaborator

Need to go through code next, but have 5.0-rc3 up and running and test suite against it here via WSL:
image
image

I'll start on code review right now, but will take a bit since you've done so much work here :)

Copy link
Collaborator

@NickCraver NickCraver left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a lot to look over, but did an initial pass. Looking good!

The main thing I see here is we can align with the other APIs and collapse methods with int? count and Order overloads, which results in a lot of overloads removed and a lot less to maintain. If you want me to take a stab at reducing this down I'm happy to, just let me know :)


<ItemGroup>
<Service Include="{82a7f48d-3b50-4b1e-b82e-3ada8210c358}" />
</ItemGroup>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can remove this change - it's something older versions of Visual Studio keep adding back, but this regression was fixed in 15.7.

/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="groupName">The name of the consumer group that received the message.</param>
/// <param name="messageIds">The ID's of the messages to acknowledge.</param>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ID's => IDs (same elsewhere in PR)

/// <param name="useApproximateMaxLength">If true, use the "~" argument to keep the stream length at or slightly above the maxLength.</param>
/// <returns>The ID of the newly created message.</returns>
/// <remarks>https://redis.io/commands/xadd</remarks>
string StreamAdd(RedisKey key, NameValueEntry[] streamFields, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

streamFields => streamPairs? streamNameValues? nameValuePairs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like streamPairs, will go with that.

@@ -2164,6 +2805,171 @@ private Message GetSortedSetRemoveRangeByScoreMessage(RedisKey key, double start
GetRange(start, exclude, true), GetRange(stop, exclude, false));
}

private Message GetStreamAcknowledgeMessage(RedisKey key, RedisValue groupName, CommandFlags flags, params string[] messageIds)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This approach creates an array on every call, even in the single messageId case - we'd need an additional overload here to not allocate.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Passing query: is messageId a strict string? Or should it be RedisValue?

return Message.Create(Database, flags, RedisCommand.XACK, arr);
}

private Message GetStreamAddMessage(RedisKey key, RedisValue entryId, int? maxLength, bool useApproximateMaxLength, CommandFlags flags, params NameValueEntry[] streamFields)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above - this creates a NameValueEntry[] array even for a single instance call (likely a common case).

{
var msg = GetStreamPendingMessagesMessage(key, groupName, minId, maxId, count, consumerName, flags);
return ExecuteAsync(msg, ResultProcessor.StreamPendingMessages);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we keep sync/async pattern on methods here? last 4 aren't paired up

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought there used to be a test that checked this with reflection... If not, I can add

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand this one, there are sync/async methods for the StreamPendingMessageInfoGet operation. Or is the naming too similar to StreamPendingInfoGet?

StreamPendingInfoGet returns a summary of pending information for the stream, whereas StreamPendingMessageInfoGet returns the information for each pending message.

image

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case there are 2 issues:

  • Ordering (should be method, async equivalent, method, async equivalent) for consistency.
  • I don't think we need 2 sets of methods here - if consumer name is null, we don't return details (just default it to RedisValue.Null)
    ...that gets the same functionality while simplifying the API. We should document this in the usage though.

return ExecuteAsync(msg, ResultProcessor.StreamPendingMessages);
}

public RedisStreamEntry[] StreamRange(RedisKey key, RedisValue minId, RedisValue maxId, CommandFlags flags = CommandFlags.None)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same for below - instead of 2 methods - how about we make count an int? on these? That'd take us from 4 (sync+async) methods down to 2.

return ExecuteSync(msg, ResultProcessor.SingleStream);
}

public RedisStreamEntry[] StreamRangeReverse(RedisKey key, RedisValue maxId, RedisValue minId, CommandFlags flags = CommandFlags.None)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above - by naming count nullable, we can collapse overloads here and simplify

Overall: for API consistency instead of StreamRangeReverse, look at how SortedSetRangeByRank and others work - add a Order order = Order.Ascending argument in the same way - this again halves our overloads needed :)

return ExecuteAsync(msg, ResultProcessor.SingleStreamWithNameSkip);
}

public Task<RedisStreamEntry[]> StreamReadAsync(RedisKey key, RedisValue afterId, int count, CommandFlags flags = CommandFlags.None)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as elsewhere: int? count and collapse overloads

return ExecuteAsync(msg, ResultProcessor.MultiStream);
}

public Task<RedisStream[]> StreamReadAsync(IList<KeyValuePair<RedisKey, RedisValue>> streamWithAfterIdList, int countPerStream, CommandFlags flags = CommandFlags.None)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Collapse: int? count with above.

@ttingen
Copy link
Contributor Author

ttingen commented Jun 26, 2018

Lots to update. I will start on these changes tomorrow evening.

@ttingen
Copy link
Contributor Author

ttingen commented Jun 27, 2018

Just commited many of the smaller items from the change request. I will try to knock out the remainder of them tomorrow.

I still need to do the following:

  • Reduce the number of overloads.
  • Remove net45 target to allow Array.Empty usage and then make those changes.
  • Convert messageId from string to RedisValue.
  • Update the method ordering to follow the sync/async pattern.

@NickCraver
Copy link
Collaborator

NickCraver commented Jun 28, 2018

Note: I pushed a launch-all-servers bash script in 18c607d that I'm using to fire up all servers in WSL for testing. This should be useful for anyone testing this PR. @ttingen If you can yank in master on the next pass, that'll make life easier :)

Appreciate all the work here - looking good. And don't worry about the Array.Empty<T> comments...it may be easier to do those post-PR given it increases file scope and such anyway. Just leave as-is and I'll tidy a bit after when unifying with pipelines direction (netstandard2.0 only).

@ttingen
Copy link
Contributor Author

ttingen commented Jun 30, 2018

I believe I completed all of the requested changes (minus the Array.Empty changes). I added a few more tests and I will continue that effort.

I didn't collapse any of the StreamAdd overloads but I would like to remove half of them. If the streamEntryId parameter were a string, we could default it to "*". But I understand the reasoning to want to standardize on using the RedisValue type for those values.

Copy link
Collaborator

@NickCraver NickCraver left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ttingen First: awesome. The cleanup on methods is great. I'm away most of the day but did a quick pass on the (now much better!) API. Leaving thoughts on how to collapse methods and looking for feedback on tightening up the naming a bit - would love thoughts when you have time /cc @mgravell as well.

Side note: if you fetch and git merge master here and push up - the misc diffs that aren't really in this PR will disappear :)

/// <param name="useApproximateMaxLength">If true, the "~" argument is used to allow the stream to exceed max length by a small number. This improves performance when removing messages.</param>
/// <returns>The ID of the newly created message.</returns>
/// <remarks>https://redis.io/commands/xadd</remarks>
RedisValue StreamAdd(RedisKey key, RedisValue streamEntryId, RedisValue streamField, RedisValue streamValue, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we move streamEntryId as the first optional arg (on both sync and async)? That'll eliminate down 4 overloads in a hurry (2 on single fields, 2 on pairs overloads).

Copy link
Contributor Author

@ttingen ttingen Jun 30, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change the type to string since we can’t default to RedisValue.Null? This would be the only place we wouldn’t be using RedisValue for the message IDs. And while I’m at it, change the name to messageId as well? IIRC, this is the only method that uses the name "streamEntryId".

/// <param name="flags">The flags to use for this operation.</param>
/// <returns>A <see cref="StreamInfo"/> instance with information about the stream.</returns>
/// <remarks>https://redis.io/topics/streams-intro</remarks>
StreamInfo StreamInfoGet(RedisKey key, CommandFlags flags = CommandFlags.None);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, this should be just StreamInfo (we don't have Get as a convention or anything - just in the few places it makes sense).

A few options to consider:

  1. StreamGetInfo, StreamGetGroupInfo, StreamGetConsumerInfo
  2. StreamInfo, StreamGroupInfo, StreamConsumerInfo
  3. StreamGetInfo, StreamGetGroups, StreamGetConsumers

They're all XINFO under the covers, and IMO it'd be nice to indicate that in naming if we can - I'm personally a fan of option 2. Thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like #2 as well.

/// <returns>An instance of <see cref="StreamPendingInfo"/>. <see cref="StreamPendingInfo"/> contains the number of pending messages, the highest and lowest ID of the pending messages, and the consumers with their pending message count.</returns>
/// <remarks>The equivalent of calling XPENDING key group.</remarks>
/// <remarks>https://redis.io/commands/xpending</remarks>
StreamPendingInfo StreamPendingInfoGet(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StreamPending?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

/// <returns>An instance of <see cref="StreamPendingMessageInfo"/> for each pending message.</returns>
/// <remarks>Equivalent of calling XPENDING key group start-id end-id count consumer-name.</remarks>
/// <remarks>https://redis.io/commands/xpending</remarks>
StreamPendingMessageInfo[] StreamPendingMessageInfoGet(RedisKey key, RedisValue groupName, RedisValue minId, RedisValue maxId, int count, RedisValue consumerName, CommandFlags flags = CommandFlags.None);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StreamPendingMessages?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

/// <returns>An instance of <see cref="RedisStream"/> for each stream.</returns>
/// <remarks>Equivalent of calling XREAD COUNT num STREAMS key1 key2 id1 id2.</remarks>
/// <remarks>https://redis.io/commands/xread</remarks>
RedisStream[] StreamRead(IList<KeyValuePair<RedisKey, RedisValue>> streamAndIdPairs, int? countPerStream = null, CommandFlags flags = CommandFlags.None);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mgravell thoughts here? We don't have this type of input type (AFAIK) anywhere else in the API, the closest equivalent is SortedSetEntry[] for adds, which incurs the array overhead you'd likely want to avoid in streams in general...suggestions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’ve always thought this one looked clunky. An input type tailored for this method would look a lot cleaner.

Something like this?
StreamRead(StreamIdPair[] streamIdPairs, int? countPerStream, ...)

@NickCraver NickCraver dismissed their stale review June 30, 2018 11:17

Outdated - awesome fixes up since.

ttingen added 8 commits June 30, 2018 12:37
The Streams data type is available in releases 5.0 RC1 and above.

- Implemented Sync & Async methods for all Stream related commands (minus the blocking options) as of 5.0 RC1:
    XACK,
    XADD,
    XCLAIM,
    XDEL,
    XGROUP,
    XINFO,
    XLEN,
    XPENDING,
    XRANGE,
    XREAD,
    XREADGROUP,
    XREVRANGE,
    XTRIM
- Added tests for the synchronous versions of the Streams API but the testing is a work in progress. Need to refactor for reuse within the streams tests and write a thorough suite of tests.
- Added a NameValueEntry struct which mimicks HashEntry. Using HashEntry for the name/value pairs of stream entries seemed wrong. Perhaps refactor the usage of HashEntry to the more generic NameValueEntry and deprecate HashEntry?
- Cleaned up some comments in the test suite.
- Added two new tests (StreamReadMultipleStreamsWithCount & StreamConsumerGroupClaimMessagesReturningIds).
- Updated the XML docs on NameValueEntry to remove the old references to HashEntry.
…reamInfoGet.

StreamInfoGet can return a null value for FirstEntry and LastEntry if the stream has no entries.
- Added overload to GetStreamAcknowledgeMessage method to receive a single messageId instead of an array.
- Added overload to GetStreamAddMessage to receive a single NameValueEntry instead of an array.
- Corrected the constructor usage of ArgumentOutOfRangeException.
- Converted to get-only properties on RedisStream and other structs.
- Converted some classes to structs (StreamInfo, StreamGroupInfo, etc.)
- Miscellaneous cleanup in ResultProcessor.cs & convert many for loops to Array.ConvertAll.
- Renamed StreamMinValue and StreamMaxValue to ReadMinValue and ReadMaxValue.
- Renamed streamFields input variable on several methods to streamPairs.
- Updated XML docs (ID's to IDs).
- Reverted changes to the Tests csproj file.
- Updated the type for messageId from string to RedisValue.
- Collapsed several overloads.
- Updated the method order in several places to follow the sync/async pattern.
- Added more tests.
- Miscellaneous cleanup.

Signed-off-by: ttingen <[email protected]>
@magmasystems
Copy link

Thanks for this. I did some initial work in my own framework (sitting on top of StackExchange.Redis) to put in support for Streams, and all I had to change in the Redis 1.26 source was to add the Stream enum to RedisType.cs. Everything else was handled by my own framework, and a healthy dose of calling Execute() and ExecuteAsync() and hand-coding the stream parsing. I am looking forward to the official support for streams that you guys are implementing.

@ttingen
Copy link
Contributor Author

ttingen commented Jul 1, 2018

@magmasystems, thanks for mentioning RedisType. That needed to be added.

@magmasystems
Copy link

magmasystems commented Jul 1, 2018

No probs, @ttingen . I had to do it because I wrote a custom WPF-based IDE and Test Harness for Redis that needed to query the data type of what a key pointed to in order to have a custom WPF data template.

@mgravell
Copy link
Collaborator

mgravell commented Jul 1, 2018

Presumably the "mutate" commands are "master only". It may be worthwhile doing a quick review and adding those ones to IsMasterOnly in Message.cs - that will avoid them being sent incorrectly to replica (slave) nodes.

@NickCraver
Copy link
Collaborator

@mgravell It shouldn't stop this PR because it's really a more global/separate issue, but note #354 is related here - I added some notes/a branch there.

ttingen added 2 commits July 1, 2018 17:00
- Removed 4 StreamAdd overloads.
- Renamed streamEntryId to messageId and defaulted it to "*" (auto-generated ID).
- Added a StreamIdPair struct and updated one of the StreamRead overloads to use it.
@ttingen
Copy link
Contributor Author

ttingen commented Jul 2, 2018

@NickCraver, thanks and it's been a pleasure.

@mgravell
Copy link
Collaborator

mgravell commented Jul 2, 2018

Heya; there's a lot of work here, so once again: thanks hugely, this PR is great. I've been deliberately and consciously avoiding jumping in so that we had a second set of "clean eyes" for a final review.

Firstly: the code is looking in great shape, thanks. It is also without doubt the single biggest PR we've received for SE.Redis, so: (awards sticky gold star)

There's a couple of things I want to quickly ask about before we click that final merge button, if you don't mind (but: this is definitely very close!)

Firstly, XREAD; my understanding is that there is a multi-key version of this that allows the consumer to fetch from multiple streams; I couldn't see an API that surfaces this - is that intentional?


Secondly, I'm still a little ... "uncomfortable" about the ambiguity of message id; in some places (mostly: add) they are a string, and in other (most) places they are RedisValue. I have seen your comment above about wanting to default to "*", but ... I think this is actually an implementation detail that we don't need to surface in the API. I suspect the most appropriate API here is probably:

RedisValue messageId = default

if that compiles, or

RedisValue? messageId = null

if it doesn't; then just check in the implementation code whether the messageId is nil; if it is, then the implementation code can substitute the wildcard.


This made me think about StreamConstants in general; it looks like almost all of these are internal implementation details, which is good. I'm a little suspicious of StreamConstants.ReadMinValue etc for similar reasons to messageId above - I wonder if that should also just default to a nil RedisValue using the same approach used by messageId above. Essentially: "if you don't supply this, it'll use the min/max", which is I think the intent? If not, please let me know!

With that done, I wonder whether StreamConstants can be made internal - as far as I can see, there's nothing left that isn't already handled by API options rather than magic values on the API.

Thoughts?

@magmasystems
Copy link

Apologies for butting in here with something that may be slightly off-topic to the PR.

I have spent a good part of my working life architecting and developing low-latency trading systems and other systems that use messaging. Usually that messaging is done through a JMS or AMQP interface over a bus like Tibco EMS, RabbitMQ, or ActiveMQ.

I think that Marc and Nick's philosophy for StackExchange.Redis is to maintain a fairly thin wrapper over the Redis API, and not to introduce any opinionated higher-level framework over StackExchange.Redis. But I am wondering if there may be an appetite for some higher-level framework that sits over the streams. For example, a RedisStreamProducer that can publish a stream to a "topic" (where the "topic" is the key of a stream) and a RedisStreamConsumer which fires a C# event when data is read from a "topic".

When I first tested out the new Redis streams, I wrote something like that in my own code, and I did not consider altering the StackExchange.Redis code in any way, given the fact that I view StackExchange.Redis as a wrapper. If Marc/Nick thinks that a higher-level framework is interesting, I can try to put something together. Otherwise, I will just keep the JMS stuff in my own code,

@mgravell
Copy link
Collaborator

mgravell commented Jul 2, 2018

@magmasystems my main concern there is: is there an existing that would allow it to be consumed from multiple places? or would it have to be provider specific. If it is provider specific, I'd rather keep it out of the main lib purely for dependency reasons; there's scope for sibling-libraries, though, that re-expose the "raw" library in the context of what that means for "tool X"

@NickCraver
Copy link
Collaborator

I think this is actually an implementation detail that we don't need to surface in the API. I suspect the most appropriate API here is probably

Agreed

I wonder whether StreamConstants can be made internal

Also agree, same with making them null by default and min/max in the implementation.

@magmasystems I think this would be where another library sits atop the APIs here, could be my view there. We could always add more concepts later if doing so was very appealing.

@ttingen
Copy link
Contributor Author

ttingen commented Jul 2, 2018

Firstly, XREAD; my understanding is that there is a multi-key version of this that allows the consumer to fetch from multiple streams; I couldn't see an API that surfaces this...

The StreamRead overload that takes an array of StreamIdPair values is the API that reads from multiple streams. It's the one that previously used an IList<KeyValuePair<RedisKey, RedisValue>> as an input parameter.

RedisStream[] StreamRead(StreamIdPair[] streamIdPairs, int? countPerStream = null, ...)

The current version looks a lot cleaner than the original but perhaps the naming of the StreamIdPair struct could be improved. I started with StreamAndIdPair but didn't like that name too much. Thoughts?

The StreamReadGroup API does however only expose the ability to read from a single stream at a time. Do you want me to add an overload that allows reading multiple streams there as well?

...about the ambiguity of message id...

Totally agree, I will make the change to RedisValue? messageId = null. The RedisValue messageId = default wouldn't compile with the current language setting (7.0) for the library.

This made me think about StreamConstants [being changed to] internal...

The only other two "constants" I was concerned about were "$" (new messages) and ">" (undelivered messages), they would be used with the XGROUP CREATE and XREADGROUP commands. But those values would work as sensible defaults for those APIs as well.

I will update the documentation to include details on the default values.

@mgravell / @NickCraver, let me know what you think about StreamIdPair and the additional StreamReadGroup overload. I will start making these changes later this evening.

@ttingen
Copy link
Contributor Author

ttingen commented Jul 3, 2018

Additionally, I only created the single-stream-reading version of StreamReadGroup to keep the API simple and straightforward.

I felt it would minimize confusion to only expose the single stream version. For example, to read multiple streams at once using XREADGROUP, each stream would need to have created a Consumer Group with the same name. Not that setting up the consumer groups is difficult, I just like the "don't make me think" mentality and decided to keep it simple.

But I'll certainly add the overload if you want it there.

EDIT: I gave this more consideration and I will add the overload. Since this is a more generalized library, I shouldn’t limit the scope to just my use cases or sensibilities.

ttingen added 2 commits July 2, 2018 22:13
- messageId on StreamAdd methods was changed from string to RedisValue? and is defaulted to null. When null, the API will send the auto-generated ID option ("*").
- StreamConstants accessibility was updated from public to internal.
- minId & maxId input parameters were updated to be of type RedisValue?, sends StreamConstants.ReadMinValue & StreamConstants.ReadMaxValue when null.
- readFromId on StreamCreateConsumerGroup updated to type RedisValue?, sends StreamConstants.NewMessages when null.
- readFromId on StreamReadGroup updated to type RedisValue?, sends StreamConstants.UndeliveredMessages when null.
@mgravell
Copy link
Collaborator

mgravell commented Jul 3, 2018

I started with StreamAndIdPair but didn't like that name too much. Thoughts?

Naming is hard; it does what it says, I guess; I think it is fine - thanks for the clarification

EDIT: I gave this more consideration and I will add the overload. Since this is a more generalized library, I shouldn’t limit the scope to just my use cases or sensibilities.

Fair enough; I wasn't going to ask you to add it, but if it is there : great

Will glance over code again :)

@mgravell
Copy link
Collaborator

mgravell commented Jul 3, 2018

In a bunch of the tests, things there's a lot of changes like:

- db.StreamCreateConsumerGroup(key, groupName, StreamConstants.ReadMinValue);
+ db.StreamCreateConsumerGroup(key, groupName, "-");

Just so I understand: is this because you're being explicit? i.e. would this be semantically equivalent? or is it different?

db.StreamCreateConsumerGroup(key, groupName);

(just so I understand)

@mgravell
Copy link
Collaborator

mgravell commented Jul 3, 2018

API looks a lot cleaner, thanks!

@ttingen
Copy link
Contributor Author

ttingen commented Jul 3, 2018

I defaulted the readFrom parameter to "$" (only read new messages) for StreamCreateConsumerGroup and ">" (only read undelivered messages) for StreamReadGroup.

Those are the options used in the first mentions of the XGROUP CREATE and XREADGROUP commands in the intro to streams docs and IIRC those options are only valid for those commands. I believe they are reasonable defaults but they can be overridden by passing "-" (start from the lowest ID in the stream) or by passing an actual ID from the stream. Many of the tests for Consumer Groups needed to read from the beginning of the stream so I was passing "-" when creating the consumer group.

Another interesting caveat when using "-" with XREADGROUP ... when used in conjunction with a consumer group that was created to only read new messages, passing "-" to XREADGROUP will read from the beginning of the stream of messages created AFTER the consumer group was created, not the actual beginning of the stream.

Consumer Groups is definitely one of the more complex features of Redis Streams. I suppose the API could be modeled to expose each of the available options as a different method, at least that would be very explicit. Leaving it as-is certainly enables a lot of flexibility in how each method can be utilized but proper use requires a sound understanding of the underlying commands and their options.

Or maybe, instead of explicit methods, the API takes and options struct that would detail how the command should be constructed. Hmm, I'll put some thought into how this functionality could be made more intuitive in the API.

I also updated the method documentation to highlight the default value that would be passed to the command if a value is not supplied for the parameter.

...Sorry for lengthy response...I will work on the remaining overload tomorrow morning.

@mgravell
Copy link
Collaborator

mgravell commented Jul 3, 2018

It is also entirely possible that what you have is perfectly fine and usable. Trying to make an API completely idiot-proof can also mean bending it out of shape; I'm not sure that is necessarily an improvement :) I'd welcome any more thoughts you have on this, but "I've considered it, and I'm happy with it like this" might also be fine and reasonable.

@ttingen
Copy link
Contributor Author

ttingen commented Jul 3, 2018

Trying to make an API completely idiot-proof can also mean bending it out of shape.

Great point. :)

Copy link
Collaborator

@NickCraver NickCraver left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just gave this a final test run locally and I think we're good - merging into master. @ttingen thanks once again for an awesome PR here. If we find something we need to break to fix after 1.x, we have 2.x on the horizon, so we're in a good position to do so if the need arises. But, I think we're good here. The API is looking sharp!

@NickCraver NickCraver merged commit 4a8061f into StackExchange:master Jul 4, 2018
@ttingen
Copy link
Contributor Author

ttingen commented Jul 4, 2018

@NickCraver I just added a commit to my fork that adds the multi-stream reading overload for StreamReadGroup. I also updated some documentation and parameter names to be "more correct". Check it out and let me know if you want me to do another PR to add those changes.

@mgravell
Copy link
Collaborator

mgravell commented Jul 4, 2018

merged into 2.0 - the only awkward bit was the temp/fake multi-bulk around ParseRedisStreamEntries - I think I've converted that code correctly. Thanks again.

(/cc @ttingen @NickCraver )

@ttingen
Copy link
Contributor Author

ttingen commented Jul 4, 2018

@mgravell @NickCraver I tinkered with passing an options struct to the StreamCreateConsumerGroup method, I think I like it. Now there's no need for the user to know any of the magic strings. What do you think?

db.StreamCreateConsumerGroupAsync("events", "consumerGroup", GroupCreateOptions.BeginningOfStream);

image

image

@mgravell
Copy link
Collaborator

mgravell commented Jul 4, 2018 via email

@ttingen
Copy link
Contributor Author

ttingen commented Jul 4, 2018

@NickCraver @mgravell And thanks again for taking so much time to work with me on this PR, I know you two are very busy individuals.

@mgravell
Copy link
Collaborator

mgravell commented Jul 4, 2018

Meh, everyone is busy :) But because we've been busy doing 47 other things is why we love so much that someone else found the time to look at this and do the thinking for us :)

@nikoudel
Copy link

nikoudel commented Aug 8, 2018

@ttingen You've done a great job, thank you very much for this!

Are you planning to add support for XREAD BLOCK at some point or is there any other way to subscribe for new incoming items?

@mgravell
Copy link
Collaborator

mgravell commented Aug 8, 2018

blocking commands are incompatible with the connection-sharing approach used by SE.Redis; my instinct would be "no"; see the discussion around "blocking pops" here for the same problem.

@PrashanthShetty
Copy link

Is there any workaround for XREAD BLOCK ?

@mgravell
Copy link
Collaborator

mgravell commented Jan 19, 2019 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants