Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python: type migration for entries_read #1768

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions python/python/glide/async_commands/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2984,7 +2984,7 @@ async def xgroup_set_id(
key: TEncodable,
group_name: TEncodable,
stream_id: TEncodable,
entries_read_id: Optional[TEncodable] = None,
entries_read: Optional[int] = None,
) -> TOK:
"""
Set the last delivered ID for a consumer group.
Expand All @@ -2995,9 +2995,8 @@ async def xgroup_set_id(
key (TEncodable): The key of the stream.
group_name (TEncodable): The consumer group name.
stream_id (TEncodable): The stream entry ID that should be set as the last delivered ID for the consumer group.
entries_read_id (Optional[TEncodable]): An arbitrary ID (that isn't the first ID, last ID, or the zero ID ("0-0"))
used to find out how many entries are between the arbitrary ID (excluding it) and the stream's last
entry. This argument can only be specified if you are using Redis version 7.0.0 or above.
entries_read: (Optional[int]): A value representing the number of stream entries already read by the
group. This option can only be specified if you are using Redis version 7.0.0 or above.

Returns:
TOK: A simple "OK" response.
Expand All @@ -3007,8 +3006,8 @@ async def xgroup_set_id(
OK # The last delivered ID for consumer group "mygroup" was set to 0.
"""
args: List[TEncodable] = [key, group_name, stream_id]
if entries_read_id is not None:
args.extend(["ENTRIESREAD", entries_read_id])
if entries_read is not None:
args.extend(["ENTRIESREAD", str(entries_read)])

return cast(
TOK,
Expand Down
15 changes: 6 additions & 9 deletions python/python/glide/async_commands/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,21 +276,18 @@ class StreamGroupOptions:
MAKE_STREAM_REDIS_API = "MKSTREAM"
ENTRIES_READ_REDIS_API = "ENTRIESREAD"

def __init__(
self, make_stream: bool = False, entries_read_id: Optional[TEncodable] = None
):
def __init__(self, make_stream: bool = False, entries_read: Optional[int] = None):
"""
Options for creating stream consumer groups. Can be used as an optional argument to `XGROUP CREATE`.

Args:
make_stream (bool): If set to True and the stream doesn't exist, this creates a new stream with a
length of 0.
entries_read_id: (Optional[TEncodable]): An arbitrary ID (that isn't the first ID, last ID, or the zero ID ("0-0"))
used to find out how many entries are between the arbitrary ID (excluding it) and the stream's last
entry. This option can only be specified if you are using Redis version 7.0.0 or above.
entries_read: (Optional[int]): A value representing the number of stream entries already read by the
group. This option can only be specified if you are using Redis version 7.0.0 or above.
"""
self.make_stream = make_stream
self.entries_read_id = entries_read_id
self.entries_read = entries_read

def to_args(self) -> List[TEncodable]:
"""
Expand All @@ -303,8 +300,8 @@ def to_args(self) -> List[TEncodable]:
if self.make_stream is True:
args.append(self.MAKE_STREAM_REDIS_API)

if self.entries_read_id is not None:
args.extend([self.ENTRIES_READ_REDIS_API, self.entries_read_id])
if self.entries_read is not None:
args.extend([self.ENTRIES_READ_REDIS_API, str(self.entries_read)])

return args

Expand Down
26 changes: 5 additions & 21 deletions python/python/tests/test_async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5337,28 +5337,19 @@ async def test_xgroup_create_xgroup_destroy(
key,
group_name1,
stream_id,
StreamGroupOptions(entries_read_id="10"),
StreamGroupOptions(entries_read=10),
)
else:
assert (
await redis_client.xgroup_create(
key,
group_name1,
stream_id,
StreamGroupOptions(entries_read_id="10"),
StreamGroupOptions(entries_read=10),
)
== OK
)

# invalid entries_read_id - cannot be the zero ("0-0") ID
with pytest.raises(RequestError):
await redis_client.xgroup_create(
key,
group_name2,
stream_id,
StreamGroupOptions(entries_read_id="0-0"),
)

# key exists, but it is not a stream
assert await redis_client.set(string_key, "foo") == OK
with pytest.raises(RequestError):
Expand Down Expand Up @@ -6465,10 +6456,10 @@ async def test_xinfo_groups_xinfo_consumers(
)

if not await check_if_server_version_lt(redis_client, "7.0.0"):
# verify xgroup_set_id with entries_read_id effects the returned value from xinfo_groups
# verify xgroup_set_id with entries_read effects the returned value from xinfo_groups
assert (
await redis_client.xgroup_set_id(
key, group_name1, stream_id1_1, entries_read_id="1"
key, group_name1, stream_id1_1, entries_read=1
)
== OK
)
Expand Down Expand Up @@ -6583,18 +6574,11 @@ async def test_xgroup_set_id(
else:
assert (
await redis_client.xgroup_set_id(
key, group_name, stream_id1_1, entries_read_id=stream_id0
key, group_name, stream_id1_1, entries_read=0
)
== OK
)

# the entries_read_id cannot be the first, last, or zero ID. Here we pass the first ID and assert that an
# error is raised.
with pytest.raises(RequestError):
await redis_client.xgroup_set_id(
key, group_name, stream_id1_1, entries_read_id=stream_id1_0
)

# xreadgroup should only return entry 1-2 since we reset the last delivered ID to 1-1
assert await redis_client.xreadgroup({key: ">"}, group_name, consumer_name) == {
key.encode(): {
Expand Down
Loading