Skip to content

Commit

Permalink
Python: adds XADD, XTRIM commands (valkey-io#1320)
Browse files Browse the repository at this point in the history
  • Loading branch information
shohamazon authored and cyip10 committed Jun 24, 2024
1 parent 5bfc0f1 commit 66b4916
Show file tree
Hide file tree
Showing 6 changed files with 321 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
* Python: Added PFADD command ([#1315](https://github.com/aws/glide-for-redis/pull/1315))
* Python: Added ZMSCORE command ([#1357](https://github.com/aws/glide-for-redis/pull/1357))
* Python: Added HRANDFIELD command ([#1334](https://github.com/aws/glide-for-redis/pull/1334))
* Python: Added XADD, XTRIM commands ([#1320](https://github.com/aws/glide-for-redis/pull/1320))

#### Fixes
* Python: Fix typing error "‘type’ object is not subscriptable" ([#1203](https://github.com/aws/glide-for-redis/pull/1203))
Expand Down
8 changes: 8 additions & 0 deletions python/python/glide/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@
GeoUnit,
InfoSection,
InsertPosition,
StreamAddOptions,
StreamTrimOptions,
TrimByMaxLen,
TrimByMinId,
UpdateOptions,
)
from glide.async_commands.redis_modules import json
Expand Down Expand Up @@ -94,6 +98,10 @@
"RangeByIndex",
"RangeByLex",
"RangeByScore",
"StreamAddOptions",
"StreamTrimOptions",
"TrimByMaxLen",
"TrimByMinId",
"UpdateOptions",
# Logger
"Logger",
Expand Down
194 changes: 193 additions & 1 deletion python/python/glide/async_commands/core.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0

from abc import ABC, abstractmethod
from datetime import datetime, timedelta
from enum import Enum
from typing import (
Expand Down Expand Up @@ -149,6 +149,134 @@ def __init__(self, longitude: float, latitude: float):
self.latitude = latitude


class StreamTrimOptions(ABC):
"""
Abstract base class for stream trim options.
"""

@abstractmethod
def __init__(
self,
exact: bool,
threshold: Union[str, int],
method: str,
limit: Optional[int] = None,
):
"""
Initialize stream trim options.
Args:
exact (bool): If `true`, the stream will be trimmed exactly.
Otherwise the stream will be trimmed in a near-exact manner, which is more efficient.
threshold (Union[str, int]): Threshold for trimming.
method (str): Method for trimming (e.g., MINID, MAXLEN).
limit (Optional[int]): Max number of entries to be trimmed. Defaults to None.
Note: If `exact` is set to `True`, `limit` cannot be specified.
"""
if exact and limit:
raise ValueError(
"If `exact` is set to `True`, `limit` cannot be specified."
)
self.exact = exact
self.threshold = threshold
self.method = method
self.limit = limit

def to_args(self) -> List[str]:
"""
Convert options to arguments for Redis command.
Returns:
List[str]: List of arguments for Redis command.
"""
option_args = [
self.method,
"=" if self.exact else "~",
str(self.threshold),
]
if self.limit is not None:
option_args.extend(["LIMIT", str(self.limit)])
return option_args


class TrimByMinId(StreamTrimOptions):
"""
Stream trim option to trim by minimum ID.
"""

def __init__(self, exact: bool, threshold: str, limit: Optional[int] = None):
"""
Initialize trim option by minimum ID.
Args:
exact (bool): If `true`, the stream will be trimmed exactly.
Otherwise the stream will be trimmed in a near-exact manner, which is more efficient.
threshold (str): Threshold for trimming by minimum ID.
limit (Optional[int]): Max number of entries to be trimmed. Defaults to None.
Note: If `exact` is set to `True`, `limit` cannot be specified.
"""
super().__init__(exact, threshold, "MINID", limit)


class TrimByMaxLen(StreamTrimOptions):
"""
Stream trim option to trim by maximum length.
"""

def __init__(self, exact: bool, threshold: int, limit: Optional[int] = None):
"""
Initialize trim option by maximum length.
Args:
exact (bool): If `true`, the stream will be trimmed exactly.
Otherwise the stream will be trimmed in a near-exact manner, which is more efficient.
threshold (int): Threshold for trimming by maximum length.
limit (Optional[int]): Max number of entries to be trimmed. Defaults to None.
Note: If `exact` is set to `True`, `limit` cannot be specified.
"""
super().__init__(exact, threshold, "MAXLEN", limit)


class StreamAddOptions:
"""
Options for adding entries to a stream.
"""

def __init__(
self,
id: Optional[str] = None,
make_stream: bool = True,
trim: Optional[StreamTrimOptions] = None,
):
"""
Initialize stream add options.
Args:
id (Optional[str]): ID for the new entry. If set, the new entry will be added with this ID. If not specified, '*' is used.
make_stream (bool, optional): If set to False, a new stream won't be created if no stream matches the given key.
trim (Optional[StreamTrimOptions]): If set, the add operation will also trim the older entries in the stream. See `StreamTrimOptions`.
"""
self.id = id
self.make_stream = make_stream
self.trim = trim

def to_args(self) -> List[str]:
"""
Convert options to arguments for Redis command.
Returns:
List[str]: List of arguments for Redis command.
"""
option_args = []
if not self.make_stream:
option_args.append("NOMKSTREAM")
if self.trim:
option_args.extend(self.trim.to_args())
option_args.append(self.id if self.id else "*")

return option_args


class GeoUnit(Enum):
"""
Enumeration representing distance units options for the `GEODIST` command.
Expand Down Expand Up @@ -1675,6 +1803,70 @@ async def type(self, key: str) -> str:
"""
return cast(str, await self._execute_command(RequestType.Type, [key]))

async def xadd(
self,
key: str,
values: List[Tuple[str, str]],
options: Optional[StreamAddOptions] = None,
) -> Optional[str]:
"""
Adds an entry to the specified stream stored at `key`. If the `key` doesn't exist, the stream is created.
See https://valkey.io/commands/xadd for more details.
Args:
key (str): The key of the stream.
values (List[Tuple[str, str]]): Field-value pairs to be added to the entry.
options (Optional[StreamAddOptions]): Additional options for adding entries to the stream. Default to None. sSee `StreamAddOptions`.
Returns:
str: The id of the added entry, or None if `options.make_stream` is set to False and no stream with the matching `key` exists.
Example:
>>> await client.xadd("mystream", [("field", "value"), ("field2", "value2")])
"1615957011958-0" # Example stream entry ID.
>>> await client.xadd("non_existing_stream", [(field, "foo1"), (field2, "bar1")], StreamAddOptions(id="0-1", make_stream=False))
None # The key doesn't exist, therefore, None is returned.
>>> await client.xadd("non_existing_stream", [(field, "foo1"), (field2, "bar1")], StreamAddOptions(id="0-1"))
"0-1" # Returns the stream id.
"""
args = [key]
if options:
args.extend(options.to_args())
else:
args.append("*")
args.extend([field for pair in values for field in pair])

return cast(Optional[str], await self._execute_command(RequestType.XAdd, args))

async def xtrim(
self,
key: str,
options: StreamTrimOptions,
) -> int:
"""
Trims the stream stored at `key` by evicting older entries.
See https://valkey.io/commands/xtrim for more details.
Args:
key (str): The key of the stream.
options (StreamTrimOptions): Options detailing how to trim the stream. See `StreamTrimOptions`.
Returns:
int: TThe number of entries deleted from the stream. If `key` doesn't exist, 0 is returned.
Example:
>>> await client.xadd("mystream", [("field", "value"), ("field2", "value2")], StreamAddOptions(id="0-1"))
>>> await client.xtrim("mystream", TrimByMinId(exact=True, threshold="0-2")))
1 # One entry was deleted from the stream.
"""
args = [key]
if options:
args.extend(options.to_args())

return cast(int, await self._execute_command(RequestType.XTrim, args))

async def geoadd(
self,
key: str,
Expand Down
51 changes: 51 additions & 0 deletions python/python/glide/async_commands/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
GeoUnit,
InfoSection,
InsertPosition,
StreamAddOptions,
StreamTrimOptions,
UpdateOptions,
)
from glide.async_commands.sorted_set import (
Expand Down Expand Up @@ -1248,6 +1250,55 @@ def type(self: TTransaction, key: str) -> TTransaction:
"""
return self.append_command(RequestType.Type, [key])

def xadd(
self: TTransaction,
key: str,
values: List[Tuple[str, str]],
options: StreamAddOptions = StreamAddOptions(),
) -> TTransaction:
"""
Adds an entry to the specified stream stored at `key`. If the `key` doesn't exist, the stream is created.
See https://valkey.io/commands/xadd for more details.
Args:
key (str): The key of the stream.
values (List[Tuple[str, str]]): Field-value pairs to be added to the entry.
options (Optional[StreamAddOptions]): Additional options for adding entries to the stream. Default to None. sSee `StreamAddOptions`.
Commands response:
str: The id of the added entry, or None if `options.make_stream` is set to False and no stream with the matching `key` exists.
"""
args = [key]
if options:
args.extend(options.to_args())
args.extend([field for pair in values for field in pair])

return self.append_command(RequestType.XAdd, args)

def xtrim(
self: TTransaction,
key: str,
options: StreamTrimOptions,
) -> TTransaction:
"""
Trims the stream stored at `key` by evicting older entries.
See https://valkey.io/commands/xtrim for more details.
Args:
key (str): The key of the stream.
options (StreamTrimOptions): Options detailing how to trim the stream. See `StreamTrimOptions`.
Commands response:
int: TThe number of entries deleted from the stream. If `key` doesn't exist, 0 is returned.
"""
args = [key]
if options:
args.extend(options.to_args())

return self.append_command(RequestType.XTrim, args)

def geoadd(
self: TTransaction,
key: str,
Expand Down
56 changes: 55 additions & 1 deletion python/python/tests/test_async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
InfBound,
InfoSection,
InsertPosition,
StreamAddOptions,
TrimByMaxLen,
TrimByMinId,
UpdateOptions,
)
from glide.async_commands.sorted_set import (
Expand Down Expand Up @@ -2064,7 +2067,7 @@ async def test_type(self, redis_client: TRedisClient):
assert (await redis_client.type(key)).lower() == "hash"
assert await redis_client.delete([key]) == 1

await redis_client.custom_command(["XADD", key, "*", "field", "value"])
await redis_client.xadd(key, [("field", "value")])
assert await redis_client.type(key) == "stream"
assert await redis_client.delete([key]) == 1

Expand Down Expand Up @@ -2123,6 +2126,57 @@ async def test_append(self, redis_client: TRedisClient):
assert await redis_client.append(key, value) == 10
assert await redis_client.get(key) == value * 2

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_xadd_xtrim(self, redis_client: TRedisClient):
key = get_random_string(10)
field, field2 = get_random_string(10), get_random_string(10)

assert (
await redis_client.xadd(
key,
[(field, "foo"), (field2, "bar")],
StreamAddOptions(make_stream=False),
)
is None
)

assert (
await redis_client.xadd(
key, [(field, "foo1"), (field2, "bar1")], StreamAddOptions(id="0-1")
)
== "0-1"
)

assert (
await redis_client.xadd(key, [(field, "foo2"), (field2, "bar2")])
) is not None
assert await redis_client.custom_command(["XLEN", key]) == 2

# This will trim the first entry.
id = await redis_client.xadd(
key,
[(field, "foo3"), (field2, "bar3")],
StreamAddOptions(trim=TrimByMaxLen(exact=True, threshold=2)),
)

assert id is not None
assert await redis_client.custom_command(["XLEN", key]) == 2

# This will trim the 2nd entry.
assert (
await redis_client.xadd(
key,
[(field, "foo4"), (field2, "bar4")],
StreamAddOptions(trim=TrimByMinId(exact=True, threshold=str(id))),
)
is not None
)
assert await redis_client.custom_command(["XLEN", key]) == 2

assert await redis_client.xtrim(key, TrimByMaxLen(threshold=1, exact=True)) == 1
assert await redis_client.custom_command(["XLEN", key]) == 1

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_pfadd(self, redis_client: TRedisClient):
Expand Down
Loading

0 comments on commit 66b4916

Please sign in to comment.