Skip to content

Commit

Permalink
Python: type migration for entries_read (valkey-io#1768)
Browse files Browse the repository at this point in the history
* Python: type migration for entries_read

* converting int to str in args

* Update python/python/glide/async_commands/core.py

Co-authored-by: Yury-Fridlyand <[email protected]>

* Update python/python/glide/async_commands/stream.py

Co-authored-by: Yury-Fridlyand <[email protected]>

---------

Co-authored-by: TJ Zhang <[email protected]>
Co-authored-by: Yury-Fridlyand <[email protected]>
  • Loading branch information
3 people authored and cyip10 committed Jul 16, 2024
1 parent 686a3bc commit fe8ec21
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 36 deletions.
11 changes: 5 additions & 6 deletions python/python/glide/async_commands/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2984,7 +2984,7 @@ async def xgroup_set_id(
key: TEncodable,
group_name: TEncodable,
stream_id: TEncodable,
entries_read_id: Optional[TEncodable] = None,
entries_read: Optional[int] = None,
) -> TOK:
"""
Set the last delivered ID for a consumer group.
Expand All @@ -2995,9 +2995,8 @@ async def xgroup_set_id(
key (TEncodable): The key of the stream.
group_name (TEncodable): The consumer group name.
stream_id (TEncodable): The stream entry ID that should be set as the last delivered ID for the consumer group.
entries_read_id (Optional[TEncodable]): An arbitrary ID (that isn't the first ID, last ID, or the zero ID ("0-0"))
used to find out how many entries are between the arbitrary ID (excluding it) and the stream's last
entry. This argument can only be specified if you are using Redis version 7.0.0 or above.
entries_read: (Optional[int]): A value representing the number of stream entries already read by the
group. This option can only be specified if you are using Redis version 7.0.0 or above.
Returns:
TOK: A simple "OK" response.
Expand All @@ -3007,8 +3006,8 @@ async def xgroup_set_id(
OK # The last delivered ID for consumer group "mygroup" was set to 0.
"""
args: List[TEncodable] = [key, group_name, stream_id]
if entries_read_id is not None:
args.extend(["ENTRIESREAD", entries_read_id])
if entries_read is not None:
args.extend(["ENTRIESREAD", str(entries_read)])

return cast(
TOK,
Expand Down
15 changes: 6 additions & 9 deletions python/python/glide/async_commands/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,21 +276,18 @@ class StreamGroupOptions:
MAKE_STREAM_REDIS_API = "MKSTREAM"
ENTRIES_READ_REDIS_API = "ENTRIESREAD"

def __init__(
self, make_stream: bool = False, entries_read_id: Optional[TEncodable] = None
):
def __init__(self, make_stream: bool = False, entries_read: Optional[int] = None):
"""
Options for creating stream consumer groups. Can be used as an optional argument to `XGROUP CREATE`.
Args:
make_stream (bool): If set to True and the stream doesn't exist, this creates a new stream with a
length of 0.
entries_read_id: (Optional[TEncodable]): An arbitrary ID (that isn't the first ID, last ID, or the zero ID ("0-0"))
used to find out how many entries are between the arbitrary ID (excluding it) and the stream's last
entry. This option can only be specified if you are using Redis version 7.0.0 or above.
entries_read: (Optional[int]): A value representing the number of stream entries already read by the
group. This option can only be specified if you are using Redis version 7.0.0 or above.
"""
self.make_stream = make_stream
self.entries_read_id = entries_read_id
self.entries_read = entries_read

def to_args(self) -> List[TEncodable]:
"""
Expand All @@ -303,8 +300,8 @@ def to_args(self) -> List[TEncodable]:
if self.make_stream is True:
args.append(self.MAKE_STREAM_REDIS_API)

if self.entries_read_id is not None:
args.extend([self.ENTRIES_READ_REDIS_API, self.entries_read_id])
if self.entries_read is not None:
args.extend([self.ENTRIES_READ_REDIS_API, str(self.entries_read)])

return args

Expand Down
26 changes: 5 additions & 21 deletions python/python/tests/test_async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5337,28 +5337,19 @@ async def test_xgroup_create_xgroup_destroy(
key,
group_name1,
stream_id,
StreamGroupOptions(entries_read_id="10"),
StreamGroupOptions(entries_read=10),
)
else:
assert (
await redis_client.xgroup_create(
key,
group_name1,
stream_id,
StreamGroupOptions(entries_read_id="10"),
StreamGroupOptions(entries_read=10),
)
== OK
)

# invalid entries_read_id - cannot be the zero ("0-0") ID
with pytest.raises(RequestError):
await redis_client.xgroup_create(
key,
group_name2,
stream_id,
StreamGroupOptions(entries_read_id="0-0"),
)

# key exists, but it is not a stream
assert await redis_client.set(string_key, "foo") == OK
with pytest.raises(RequestError):
Expand Down Expand Up @@ -6465,10 +6456,10 @@ async def test_xinfo_groups_xinfo_consumers(
)

if not await check_if_server_version_lt(redis_client, "7.0.0"):
# verify xgroup_set_id with entries_read_id effects the returned value from xinfo_groups
# verify xgroup_set_id with entries_read effects the returned value from xinfo_groups
assert (
await redis_client.xgroup_set_id(
key, group_name1, stream_id1_1, entries_read_id="1"
key, group_name1, stream_id1_1, entries_read=1
)
== OK
)
Expand Down Expand Up @@ -6583,18 +6574,11 @@ async def test_xgroup_set_id(
else:
assert (
await redis_client.xgroup_set_id(
key, group_name, stream_id1_1, entries_read_id=stream_id0
key, group_name, stream_id1_1, entries_read=0
)
== OK
)

# the entries_read_id cannot be the first, last, or zero ID. Here we pass the first ID and assert that an
# error is raised.
with pytest.raises(RequestError):
await redis_client.xgroup_set_id(
key, group_name, stream_id1_1, entries_read_id=stream_id1_0
)

# xreadgroup should only return entry 1-2 since we reset the last delivered ID to 1-1
assert await redis_client.xreadgroup({key: ">"}, group_name, consumer_name) == {
key.encode(): {
Expand Down

0 comments on commit fe8ec21

Please sign in to comment.