Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python: add BLPOP and BRPOP commands #196

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
* Python: Added PFADD command ([#1315](https://github.com/aws/glide-for-redis/pull/1315))
* Python: Added ZMSCORE command ([#1357](https://github.com/aws/glide-for-redis/pull/1357))
* Python: Added HRANDFIELD command ([#1334](https://github.com/aws/glide-for-redis/pull/1334))
* Python: Added BLPOP and BRPOP commands (TODO: add PR link)

#### Fixes
* Python: Fix typing error "‘type’ object is not subscriptable" ([#1203](https://github.com/aws/glide-for-redis/pull/1203))
Expand Down
60 changes: 60 additions & 0 deletions python/python/glide/async_commands/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1002,6 +1002,36 @@ async def lpop_count(self, key: str, count: int) -> Optional[List[str]]:
await self._execute_command(RequestType.LPop, [key, str(count)]),
)

async def blpop(self, keys: List[str], timeout: float) -> Optional[List[str]]:
"""
Pops an element from the head of the first list that is non-empty, with the given keys being checked in the
order that they are given.
Blocks the connection when there are no elements to pop from any of the given lists.

See https://valkey.io/commands/blpop for details.

Notes:
1: BLPOP is a client blocking command, see https://github.com/aws/glide-for-redis/wiki/General-Concepts#blocking-commands for more details and best practices.
2: When in cluster mode, all keys must map to the same hash slot.

Args:
keys (List[str]): The keys of the lists to pop from.
timeout (float): The number of seconds to wait for a blocking operation to complete. A
value of `0` will block indefinitely.

Returns:
Optional[List[str]]: A two-element list containing the `key` from which the element was popped and the `value` of the
popped element, formatted as `[key, value]`. If no element could be popped and the timeout expired, returns `None`.

Examples:
>>> await client.blpop(["list1", "list2"], 0.5)
["list1", "element"]
"""
return cast(
Optional[List[str]],
await self._execute_command(RequestType.Blpop, keys + [str(timeout)]),
)

async def lrange(self, key: str, start: int, end: int) -> List[str]:
"""
Retrieve the specified elements of the list stored at `key` within the given range.
Expand Down Expand Up @@ -1164,6 +1194,36 @@ async def rpop_count(self, key: str, count: int) -> Optional[List[str]]:
await self._execute_command(RequestType.RPop, [key, str(count)]),
)

async def brpop(self, keys: List[str], timeout: float) -> Optional[List[str]]:
"""
Pops an element from the tail of the first list that is non-empty, with the given keys being checked in the
order that they are given.
Blocks the connection when there are no elements to pop from any of the given lists.

See https://valkey.io/commands/brpop for details.

Notes:
1: BRPOP is a client blocking command, see https://github.com/aws/glide-for-redis/wiki/General-Concepts#blocking-commands for more details and best practices.
2: When in cluster mode, all keys must map to the same hash slot.

Args:
keys (List[str]): The keys of the lists to pop from.
timeout (float): The number of seconds to wait for a blocking operation to complete. A
value of `0` will block indefinitely.

Returns:
Optional[List[str]]: A two-element list containing the `key` from which the element was popped and the `value` of the
popped element, formatted as `[key, value]`. If no element could be popped and the timeout expired, returns `None`.

Examples:
>>> await client.brpop(["list1", "list2"], 0.5)
["list1", "element"]
"""
return cast(
Optional[List[str]],
await self._execute_command(RequestType.Brpop, keys + [str(timeout)]),
)

async def linsert(
self, key: str, position: InsertPosition, pivot: str, element: str
) -> int:
Expand Down
46 changes: 46 additions & 0 deletions python/python/glide/async_commands/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,29 @@ def lpop_count(self: TTransaction, key: str, count: int) -> TTransaction:
"""
return self.append_command(RequestType.LPop, [key, str(count)])

def blpop(self: TTransaction, keys: List[str], timeout: float) -> TTransaction:
"""
Pops an element from the head of the first list that is non-empty, with the given keys being checked in the
order that they are given.
Blocks the connection when there are no elements to pop from any of the given lists.

See https://valkey.io/commands/blpop for details.

Note: BLPOP is a client blocking command, see
https://github.com/aws/glide-for-redis/wiki/General-Concepts#blocking-commands for more details and best
practices.

Args:
keys (List[str]): The keys of the lists to pop from.
timeout (float): The number of seconds to wait for a blocking operation to complete. A
value of `0` will block indefinitely.

Command response:
Optional[List[str]]: A two-element list containing the `key` from which the element was popped and the `value` of the
popped element, formatted as `[key, value]`. If no element could be popped and the timeout expired, returns `None`.
"""
return self.append_command(RequestType.Blpop, keys + [str(timeout)])

def lrange(self: TTransaction, key: str, start: int, end: int) -> TTransaction:
"""
Retrieve the specified elements of the list stored at `key` within the given range.
Expand Down Expand Up @@ -831,6 +854,29 @@ def rpop_count(self: TTransaction, key: str, count: int) -> TTransaction:
"""
return self.append_command(RequestType.RPop, [key, str(count)])

def brpop(self: TTransaction, keys: List[str], timeout: float) -> TTransaction:
"""
Pops an element from the tail of the first list that is non-empty, with the given keys being checked in the
order that they are given.
Blocks the connection when there are no elements to pop from any of the given lists.

See https://valkey.io/commands/brpop for details.

Note: BRPOP is a client blocking command, see
https://github.com/aws/glide-for-redis/wiki/General-Concepts#blocking-commands for more details and best
practices.

Args:
keys (List[str]): The keys of the lists to pop from.
timeout (float): The number of seconds to wait for a blocking operation to complete. A
value of `0` will block indefinitely.

Command response:
Optional[List[str]]: A two-element list containing the `key` from which the element was popped and the `value` of the
popped element, formatted as `[key, value]`. If no element could be popped and the timeout expired, returns `None`.
"""
return self.append_command(RequestType.Brpop, keys + [str(timeout)])

def linsert(
self: TTransaction, key: str, position: InsertPosition, pivot: str, element: str
) -> TTransaction:
Expand Down
68 changes: 68 additions & 0 deletions python/python/tests/test_async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -927,6 +927,40 @@ async def test_lpushx(self, redis_client: TRedisClient):
with pytest.raises(RequestError):
await redis_client.lpushx(key1, [])

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_blpop(self, redis_client: TRedisClient):
key1 = f"{{test}}-1-f{get_random_string(10)}"
key2 = f"{{test}}-2-f{get_random_string(10)}"
value1 = "value1"
value2 = "value2"
value_list = [value1, value2]

assert await redis_client.lpush(key1, value_list) == 2
# ensure that command doesn't time out even if timeout > request timeout (250ms by default)
assert await redis_client.blpop([key1, key2], 0.5) == [key1, value2]

assert await redis_client.blpop(["non_existent_key"], 0.5) is None

# key exists, but not a list
assert await redis_client.set("foo", "bar")
with pytest.raises(RequestError):
await redis_client.blpop(["foo"], 0.001)

# same-slot requirement
if isinstance(redis_client, RedisClusterClient):
with pytest.raises(RequestError) as e:
await redis_client.blpop(["abc", "zxy", "lkn"], 0.5)
assert "CrossSlot" in str(e)

async def endless_blpop_call():
await redis_client.blpop(["non_existent_key"], 0)

# blpop is called against a non-existing key with no timeout, but we wrap the call in an asyncio timeout to
# avoid having the test block forever
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(endless_blpop_call(), timeout=3)

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_lindex(self, redis_client: TRedisClient):
Expand Down Expand Up @@ -985,6 +1019,40 @@ async def test_rpushx(self, redis_client: TRedisClient):
with pytest.raises(RequestError):
await redis_client.rpushx(key2, [])

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_brpop(self, redis_client: TRedisClient):
key1 = f"{{test}}-1-f{get_random_string(10)}"
key2 = f"{{test}}-2-f{get_random_string(10)}"
value1 = "value1"
value2 = "value2"
value_list = [value1, value2]

assert await redis_client.lpush(key1, value_list) == 2
# ensure that command doesn't time out even if timeout > request timeout (250ms by default)
assert await redis_client.brpop([key1, key2], 0.5) == [key1, value1]

assert await redis_client.brpop(["non_existent_key"], 0.5) is None

# key exists, but not a list
assert await redis_client.set("foo", "bar")
with pytest.raises(RequestError):
await redis_client.brpop(["foo"], 0.001)

# same-slot requirement
if isinstance(redis_client, RedisClusterClient):
with pytest.raises(RequestError) as e:
await redis_client.brpop(["abc", "zxy", "lkn"], 0.5)
assert "CrossSlot" in str(e)

async def endless_brpop_call():
await redis_client.brpop(["non_existent_key"], 0)

# brpop is called against a non-existing key with no timeout, but we wrap the call in an asyncio timeout to
# avoid having the test block forever
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(endless_brpop_call(), timeout=3)

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_linsert(self, redis_client: TRedisClient):
Expand Down
18 changes: 13 additions & 5 deletions python/python/tests/test_transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,13 @@ async def transaction_test(
key6 = "{{{}}}:{}".format(keyslot, get_random_string(3))
key7 = "{{{}}}:{}".format(keyslot, get_random_string(3))
key8 = "{{{}}}:{}".format(keyslot, get_random_string(3))
key9 = "{{{}}}:{}".format(keyslot, get_random_string(3))
key9 = "{{{}}}:{}".format(keyslot, get_random_string(3)) # list
key10 = "{{{}}}:{}".format(keyslot, get_random_string(3)) # hyper log log
key13 = "{{{}}}:{}".format(keyslot, get_random_string(3)) # geo

value = datetime.now(timezone.utc).strftime("%m/%d/%Y, %H:%M:%S")
value2 = get_random_string(5)
value3 = get_random_string(5)
args: List[TResult] = []

transaction.dbsize()
Expand Down Expand Up @@ -168,6 +170,12 @@ async def transaction_test(
args.append(0)
transaction.lpushx(key9, ["_"])
args.append(0)
transaction.lpush(key9, [value, value2, value3])
args.append(3)
transaction.blpop([key9], 1)
args.append([key9, value3])
transaction.brpop([key9], 1)
args.append([key9, value])

transaction.sadd(key7, ["foo", "bar"])
args.append(2)
Expand Down Expand Up @@ -224,18 +232,18 @@ async def transaction_test(
args.append(1)

transaction.geoadd(
key9,
key13,
{
"Palermo": GeospatialData(13.361389, 38.115556),
"Catania": GeospatialData(15.087269, 37.502669),
},
)
args.append(2)
transaction.geodist(key9, "Palermo", "Catania")
transaction.geodist(key13, "Palermo", "Catania")
args.append(166274.1516)
transaction.geohash(key9, ["Palermo", "Catania", "Place"])
transaction.geohash(key13, ["Palermo", "Catania", "Place"])
args.append(["sqc8b49rny0", "sqdtr74hyu0", None])
transaction.geopos(key9, ["Palermo", "Catania", "Place"])
transaction.geopos(key13, ["Palermo", "Catania", "Place"])
# The comparison allows for a small tolerance level due to potential precision errors in floating-point calculations
# No worries, Python can handle it, therefore, this shouldn't fail
args.append(
Expand Down
Loading