diff --git a/CHANGELOG.md b/CHANGELOG.md index 365fa9f677..4ac1119b63 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ * Python: Added PFADD command ([#1315](https://github.com/aws/glide-for-redis/pull/1315)) * Python: Added ZMSCORE command ([#1357](https://github.com/aws/glide-for-redis/pull/1357)) * Python: Added HRANDFIELD command ([#1334](https://github.com/aws/glide-for-redis/pull/1334)) +* Python: Added BLPOP and BRPOP commands (TODO: add PR link) #### Fixes * Python: Fix typing error "‘type’ object is not subscriptable" ([#1203](https://github.com/aws/glide-for-redis/pull/1203)) diff --git a/python/python/glide/async_commands/core.py b/python/python/glide/async_commands/core.py index f6dc886fc2..455f2afdbc 100644 --- a/python/python/glide/async_commands/core.py +++ b/python/python/glide/async_commands/core.py @@ -1002,6 +1002,36 @@ async def lpop_count(self, key: str, count: int) -> Optional[List[str]]: await self._execute_command(RequestType.LPop, [key, str(count)]), ) + async def blpop(self, keys: List[str], timeout: float) -> Optional[List[str]]: + """ + Pops an element from the head of the first list that is non-empty, with the given keys being checked in the + order that they are given. + Blocks the connection when there are no elements to pop from any of the given lists. + + See https://valkey.io/commands/blpop for details. + + Notes: + 1: BLPOP is a client blocking command, see https://github.com/aws/glide-for-redis/wiki/General-Concepts#blocking-commands for more details and best practices. + 2: When in cluster mode, all keys must map to the same hash slot. + + Args: + keys (List[str]): The keys of the lists to pop from. + timeout (float): The number of seconds to wait for a blocking operation to complete. A + value of `0` will block indefinitely. + + Returns: + Optional[List[str]]: A two-element list containing the `key` from which the element was popped and the `value` of the + popped element, formatted as `[key, value]`. If no element could be popped and the timeout expired, returns `None`. + + Examples: + >>> await client.blpop(["list1", "list2"], 0.5) + ["list1", "element"] + """ + return cast( + Optional[List[str]], + await self._execute_command(RequestType.Blpop, keys + [str(timeout)]), + ) + async def lrange(self, key: str, start: int, end: int) -> List[str]: """ Retrieve the specified elements of the list stored at `key` within the given range. @@ -1164,6 +1194,36 @@ async def rpop_count(self, key: str, count: int) -> Optional[List[str]]: await self._execute_command(RequestType.RPop, [key, str(count)]), ) + async def brpop(self, keys: List[str], timeout: float) -> Optional[List[str]]: + """ + Pops an element from the tail of the first list that is non-empty, with the given keys being checked in the + order that they are given. + Blocks the connection when there are no elements to pop from any of the given lists. + + See https://valkey.io/commands/brpop for details. + + Notes: + 1: BRPOP is a client blocking command, see https://github.com/aws/glide-for-redis/wiki/General-Concepts#blocking-commands for more details and best practices. + 2: When in cluster mode, all keys must map to the same hash slot. + + Args: + keys (List[str]): The keys of the lists to pop from. + timeout (float): The number of seconds to wait for a blocking operation to complete. A + value of `0` will block indefinitely. + + Returns: + Optional[List[str]]: A two-element list containing the `key` from which the element was popped and the `value` of the + popped element, formatted as `[key, value]`. If no element could be popped and the timeout expired, returns `None`. + + Examples: + >>> await client.brpop(["list1", "list2"], 0.5) + ["list1", "element"] + """ + return cast( + Optional[List[str]], + await self._execute_command(RequestType.Brpop, keys + [str(timeout)]), + ) + async def linsert( self, key: str, position: InsertPosition, pivot: str, element: str ) -> int: diff --git a/python/python/glide/async_commands/transaction.py b/python/python/glide/async_commands/transaction.py index a90527e586..c80dc7d471 100644 --- a/python/python/glide/async_commands/transaction.py +++ b/python/python/glide/async_commands/transaction.py @@ -725,6 +725,29 @@ def lpop_count(self: TTransaction, key: str, count: int) -> TTransaction: """ return self.append_command(RequestType.LPop, [key, str(count)]) + def blpop(self: TTransaction, keys: List[str], timeout: float) -> TTransaction: + """ + Pops an element from the head of the first list that is non-empty, with the given keys being checked in the + order that they are given. + Blocks the connection when there are no elements to pop from any of the given lists. + + See https://valkey.io/commands/blpop for details. + + Note: BLPOP is a client blocking command, see + https://github.com/aws/glide-for-redis/wiki/General-Concepts#blocking-commands for more details and best + practices. + + Args: + keys (List[str]): The keys of the lists to pop from. + timeout (float): The number of seconds to wait for a blocking operation to complete. A + value of `0` will block indefinitely. + + Command response: + Optional[List[str]]: A two-element list containing the `key` from which the element was popped and the `value` of the + popped element, formatted as `[key, value]`. If no element could be popped and the timeout expired, returns `None`. + """ + return self.append_command(RequestType.Blpop, keys + [str(timeout)]) + def lrange(self: TTransaction, key: str, start: int, end: int) -> TTransaction: """ Retrieve the specified elements of the list stored at `key` within the given range. @@ -831,6 +854,29 @@ def rpop_count(self: TTransaction, key: str, count: int) -> TTransaction: """ return self.append_command(RequestType.RPop, [key, str(count)]) + def brpop(self: TTransaction, keys: List[str], timeout: float) -> TTransaction: + """ + Pops an element from the tail of the first list that is non-empty, with the given keys being checked in the + order that they are given. + Blocks the connection when there are no elements to pop from any of the given lists. + + See https://valkey.io/commands/brpop for details. + + Note: BRPOP is a client blocking command, see + https://github.com/aws/glide-for-redis/wiki/General-Concepts#blocking-commands for more details and best + practices. + + Args: + keys (List[str]): The keys of the lists to pop from. + timeout (float): The number of seconds to wait for a blocking operation to complete. A + value of `0` will block indefinitely. + + Command response: + Optional[List[str]]: A two-element list containing the `key` from which the element was popped and the `value` of the + popped element, formatted as `[key, value]`. If no element could be popped and the timeout expired, returns `None`. + """ + return self.append_command(RequestType.Brpop, keys + [str(timeout)]) + def linsert( self: TTransaction, key: str, position: InsertPosition, pivot: str, element: str ) -> TTransaction: diff --git a/python/python/tests/test_async_client.py b/python/python/tests/test_async_client.py index eee211a238..66b4562a9e 100644 --- a/python/python/tests/test_async_client.py +++ b/python/python/tests/test_async_client.py @@ -927,6 +927,40 @@ async def test_lpushx(self, redis_client: TRedisClient): with pytest.raises(RequestError): await redis_client.lpushx(key1, []) + @pytest.mark.parametrize("cluster_mode", [True, False]) + @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) + async def test_blpop(self, redis_client: TRedisClient): + key1 = f"{{test}}-1-f{get_random_string(10)}" + key2 = f"{{test}}-2-f{get_random_string(10)}" + value1 = "value1" + value2 = "value2" + value_list = [value1, value2] + + assert await redis_client.lpush(key1, value_list) == 2 + # ensure that command doesn't time out even if timeout > request timeout (250ms by default) + assert await redis_client.blpop([key1, key2], 0.5) == [key1, value2] + + assert await redis_client.blpop(["non_existent_key"], 0.5) is None + + # key exists, but not a list + assert await redis_client.set("foo", "bar") + with pytest.raises(RequestError): + await redis_client.blpop(["foo"], 0.001) + + # same-slot requirement + if isinstance(redis_client, RedisClusterClient): + with pytest.raises(RequestError) as e: + await redis_client.blpop(["abc", "zxy", "lkn"], 0.5) + assert "CrossSlot" in str(e) + + async def endless_blpop_call(): + await redis_client.blpop(["non_existent_key"], 0) + + # blpop is called against a non-existing key with no timeout, but we wrap the call in an asyncio timeout to + # avoid having the test block forever + with pytest.raises(asyncio.TimeoutError): + await asyncio.wait_for(endless_blpop_call(), timeout=3) + @pytest.mark.parametrize("cluster_mode", [True, False]) @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) async def test_lindex(self, redis_client: TRedisClient): @@ -985,6 +1019,40 @@ async def test_rpushx(self, redis_client: TRedisClient): with pytest.raises(RequestError): await redis_client.rpushx(key2, []) + @pytest.mark.parametrize("cluster_mode", [True, False]) + @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) + async def test_brpop(self, redis_client: TRedisClient): + key1 = f"{{test}}-1-f{get_random_string(10)}" + key2 = f"{{test}}-2-f{get_random_string(10)}" + value1 = "value1" + value2 = "value2" + value_list = [value1, value2] + + assert await redis_client.lpush(key1, value_list) == 2 + # ensure that command doesn't time out even if timeout > request timeout (250ms by default) + assert await redis_client.brpop([key1, key2], 0.5) == [key1, value1] + + assert await redis_client.brpop(["non_existent_key"], 0.5) is None + + # key exists, but not a list + assert await redis_client.set("foo", "bar") + with pytest.raises(RequestError): + await redis_client.brpop(["foo"], 0.001) + + # same-slot requirement + if isinstance(redis_client, RedisClusterClient): + with pytest.raises(RequestError) as e: + await redis_client.brpop(["abc", "zxy", "lkn"], 0.5) + assert "CrossSlot" in str(e) + + async def endless_brpop_call(): + await redis_client.brpop(["non_existent_key"], 0) + + # brpop is called against a non-existing key with no timeout, but we wrap the call in an asyncio timeout to + # avoid having the test block forever + with pytest.raises(asyncio.TimeoutError): + await asyncio.wait_for(endless_brpop_call(), timeout=3) + @pytest.mark.parametrize("cluster_mode", [True, False]) @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) async def test_linsert(self, redis_client: TRedisClient): diff --git a/python/python/tests/test_transaction.py b/python/python/tests/test_transaction.py index 24aba819ff..92128f1c36 100644 --- a/python/python/tests/test_transaction.py +++ b/python/python/tests/test_transaction.py @@ -37,11 +37,13 @@ async def transaction_test( key6 = "{{{}}}:{}".format(keyslot, get_random_string(3)) key7 = "{{{}}}:{}".format(keyslot, get_random_string(3)) key8 = "{{{}}}:{}".format(keyslot, get_random_string(3)) - key9 = "{{{}}}:{}".format(keyslot, get_random_string(3)) + key9 = "{{{}}}:{}".format(keyslot, get_random_string(3)) # list key10 = "{{{}}}:{}".format(keyslot, get_random_string(3)) # hyper log log + key13 = "{{{}}}:{}".format(keyslot, get_random_string(3)) # geo value = datetime.now(timezone.utc).strftime("%m/%d/%Y, %H:%M:%S") value2 = get_random_string(5) + value3 = get_random_string(5) args: List[TResult] = [] transaction.dbsize() @@ -168,6 +170,12 @@ async def transaction_test( args.append(0) transaction.lpushx(key9, ["_"]) args.append(0) + transaction.lpush(key9, [value, value2, value3]) + args.append(3) + transaction.blpop([key9], 1) + args.append([key9, value3]) + transaction.brpop([key9], 1) + args.append([key9, value]) transaction.sadd(key7, ["foo", "bar"]) args.append(2) @@ -224,18 +232,18 @@ async def transaction_test( args.append(1) transaction.geoadd( - key9, + key13, { "Palermo": GeospatialData(13.361389, 38.115556), "Catania": GeospatialData(15.087269, 37.502669), }, ) args.append(2) - transaction.geodist(key9, "Palermo", "Catania") + transaction.geodist(key13, "Palermo", "Catania") args.append(166274.1516) - transaction.geohash(key9, ["Palermo", "Catania", "Place"]) + transaction.geohash(key13, ["Palermo", "Catania", "Place"]) args.append(["sqc8b49rny0", "sqdtr74hyu0", None]) - transaction.geopos(key9, ["Palermo", "Catania", "Place"]) + transaction.geopos(key13, ["Palermo", "Catania", "Place"]) # The comparison allows for a small tolerance level due to potential precision errors in floating-point calculations # No worries, Python can handle it, therefore, this shouldn't fail args.append(