From 2288218bee4196c82a709dd291030e6b1320a602 Mon Sep 17 00:00:00 2001 From: Anuragkillswitch <70265851+Anuragkillswitch@users.noreply.github.com> Date: Tue, 11 Apr 2023 15:00:57 +0530 Subject: [PATCH 1/6] feat: adding support for cluster myshardid --- redis/cluster.py | 6 ++++++ redis/commands/cluster.py | 8 ++++++++ tests/test_cluster.py | 7 +++++++ 3 files changed, 21 insertions(+) diff --git a/redis/cluster.py b/redis/cluster.py index 5e6e7da546..fc8a8c3dc8 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -112,6 +112,11 @@ def parse_cluster_shards(resp, **options): return shards +def parse_cluster_myshardid(resp, **options): + """ + Parse CLUSTER MYSHARDID response. + """ + return resp.decode('utf-8') PRIMARY = "primary" REPLICA = "replica" @@ -341,6 +346,7 @@ class AbstractRedisCluster: CLUSTER_COMMANDS_RESPONSE_CALLBACKS = { "CLUSTER SLOTS": parse_cluster_slots, "CLUSTER SHARDS": parse_cluster_shards, + "CLUSTER MYSHARDID": parse_cluster_myshardid } RESULT_CALLBACKS = dict_merge( diff --git a/redis/commands/cluster.py b/redis/commands/cluster.py index a23a94a3d3..67a1e4104d 100644 --- a/redis/commands/cluster.py +++ b/redis/commands/cluster.py @@ -633,6 +633,14 @@ def cluster_shards(self, target_nodes=None): For more information see https://redis.io/commands/cluster-shards """ return self.execute_command("CLUSTER SHARDS", target_nodes=target_nodes) + + def cluster_myshardid(self, target_nodes=None): + """ + Returns details about the shards of the cluster. + + For more information see https://redis.io/commands/cluster-shards + """ + return self.execute_command("CLUSTER MYSHARDID", target_nodes=target_nodes) def cluster_links(self, target_node: "TargetNodesT") -> ResponseT: """ diff --git a/tests/test_cluster.py b/tests/test_cluster.py index 58f9b77d7d..289ec3bfe7 100644 --- a/tests/test_cluster.py +++ b/tests/test_cluster.py @@ -1046,6 +1046,13 @@ def test_cluster_shards(self, r): for attribute in node.keys(): assert attribute in attributes + @skip_if_server_version_lt("7.2.0") + @skip_if_redis_enterprise() + def test_cluster_myshardid(self, r): + myshardid = r.cluster_myshardid() + assert isinstance(myshardid, str) + assert len(myshardid) > 0 + @skip_if_redis_enterprise() def test_cluster_addslots(self, r): node = r.get_random_node() From 667cd9843d5e3d8a801500a952eee9372f991088 Mon Sep 17 00:00:00 2001 From: Anuragkillswitch <70265851+Anuragkillswitch@users.noreply.github.com> Date: Tue, 11 Apr 2023 16:09:43 +0530 Subject: [PATCH 2/6] lint fix --- redis/cluster.py | 106 +++++++++++++++++++------------------- redis/commands/cluster.py | 77 ++++++++++++++------------- tests/test_cluster.py | 86 +++++++++++++++---------------- 3 files changed, 133 insertions(+), 136 deletions(-) diff --git a/redis/cluster.py b/redis/cluster.py index fc8a8c3dc8..54c369a78a 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -74,7 +74,7 @@ def parse_pubsub_numsub(command, res, **options): def parse_cluster_slots( - resp: Any, **options: Any + resp: Any, **options: Any ) -> Dict[Tuple[int, int], Dict[str, Any]]: current_host = options.get("current_host", "") @@ -112,12 +112,14 @@ def parse_cluster_shards(resp, **options): return shards + def parse_cluster_myshardid(resp, **options): """ Parse CLUSTER MYSHARDID response. """ return resp.decode('utf-8') + PRIMARY = "primary" REPLICA = "replica" SLOT_ID = "slot-id" @@ -461,18 +463,18 @@ class initializer. In the case of conflicting arguments, querystring return cls(url=url, **kwargs) def __init__( - self, - host: Optional[str] = None, - port: int = 6379, - startup_nodes: Optional[List["ClusterNode"]] = None, - cluster_error_retry_attempts: int = 3, - retry: Optional["Retry"] = None, - require_full_coverage: bool = False, - reinitialize_steps: int = 5, - read_from_replicas: bool = False, - dynamic_startup_nodes: bool = True, - url: Optional[str] = None, - **kwargs, + self, + host: Optional[str] = None, + port: int = 6379, + startup_nodes: Optional[List["ClusterNode"]] = None, + cluster_error_retry_attempts: int = 3, + retry: Optional["Retry"] = None, + require_full_coverage: bool = False, + reinitialize_steps: int = 5, + read_from_replicas: bool = False, + dynamic_startup_nodes: bool = True, + url: Optional[str] = None, + **kwargs, ): """ Initialize a new RedisCluster client. @@ -768,14 +770,14 @@ def pipeline(self, transaction=None, shard_hint=None): ) def lock( - self, - name, - timeout=None, - sleep=0.1, - blocking=True, - blocking_timeout=None, - lock_class=None, - thread_local=True, + self, + name, + timeout=None, + sleep=0.1, + blocking=True, + blocking_timeout=None, + lock_class=None, + thread_local=True, ): """ Return a new Lock object using key ``name`` that mimics @@ -941,7 +943,7 @@ def determine_slot(self, *args): if len(args) <= 2: raise RedisClusterException(f"Invalid args in command: {args}") num_actual_keys = args[2] - eval_keys = args[3 : 3 + num_actual_keys] + eval_keys = args[3: 3 + num_actual_keys] # if there are 0 keys, that means the script can be run on any node # so we can just return a random slot if len(eval_keys) == 0: @@ -1058,8 +1060,8 @@ def execute_command(self, *args, **kwargs): f"No targets were found to execute {args} command on" ) if ( - len(target_nodes) == 1 - and target_nodes[0] == self.get_default_node() + len(target_nodes) == 1 + and target_nodes[0] == self.get_default_node() ): is_default_node = True for node in target_nodes: @@ -1268,14 +1270,14 @@ def reset(self) -> None: class NodesManager: def __init__( - self, - startup_nodes, - from_url=False, - require_full_coverage=False, - lock=None, - dynamic_startup_nodes=True, - connection_pool_class=ConnectionPool, - **kwargs, + self, + startup_nodes, + from_url=False, + require_full_coverage=False, + lock=None, + dynamic_startup_nodes=True, + connection_pool_class=ConnectionPool, + **kwargs, ): self.nodes_cache = {} self.slots_cache = {} @@ -1374,9 +1376,9 @@ def get_node_from_slot(self, slot, read_from_replicas=False, server_type=None): primary_name, len(self.slots_cache[slot]) ) elif ( - server_type is None - or server_type == PRIMARY - or len(self.slots_cache[slot]) == 1 + server_type is None + or server_type == PRIMARY + or len(self.slots_cache[slot]) == 1 ): # return a primary node_idx = 0 @@ -1496,9 +1498,9 @@ def initialize(self): # If there's only one server in the cluster, its ``host`` is '' # Fix it to the host in startup_nodes if ( - len(cluster_slots) == 1 - and len(cluster_slots[0][2][0]) == 0 - and len(self.startup_nodes) == 1 + len(cluster_slots) == 1 + and len(cluster_slots[0][2][0]) == 0 + and len(self.startup_nodes) == 1 ): cluster_slots[0][2][0] = startup_node.host @@ -1740,17 +1742,17 @@ class ClusterPipeline(RedisCluster): ) def __init__( - self, - nodes_manager: "NodesManager", - commands_parser: "CommandsParser", - result_callbacks: Optional[Dict[str, Callable]] = None, - cluster_response_callbacks: Optional[Dict[str, Callable]] = None, - startup_nodes: Optional[List["ClusterNode"]] = None, - read_from_replicas: bool = False, - cluster_error_retry_attempts: int = 3, - reinitialize_steps: int = 5, - lock=None, - **kwargs, + self, + nodes_manager: "NodesManager", + commands_parser: "CommandsParser", + result_callbacks: Optional[Dict[str, Callable]] = None, + cluster_response_callbacks: Optional[Dict[str, Callable]] = None, + startup_nodes: Optional[List["ClusterNode"]] = None, + read_from_replicas: bool = False, + cluster_error_retry_attempts: int = 3, + reinitialize_steps: int = 5, + lock=None, + **kwargs, ): """ """ self.command_stack = [] @@ -1758,7 +1760,7 @@ def __init__( self.commands_parser = commands_parser self.refresh_table_asap = False self.result_callbacks = ( - result_callbacks or self.__class__.RESULT_CALLBACKS.copy() + result_callbacks or self.__class__.RESULT_CALLBACKS.copy() ) self.startup_nodes = startup_nodes if startup_nodes else [] self.read_from_replicas = read_from_replicas @@ -1881,7 +1883,7 @@ def reset(self): # self.connection = None def send_cluster_commands( - self, stack, raise_on_error=True, allow_redirections=True + self, stack, raise_on_error=True, allow_redirections=True ): """ Wrapper for CLUSTERDOWN error handling. @@ -1918,7 +1920,7 @@ def send_cluster_commands( raise e def _send_cluster_commands( - self, stack, raise_on_error=True, allow_redirections=True + self, stack, raise_on_error=True, allow_redirections=True ): """ Send a bunch of cluster commands to the redis cluster. diff --git a/redis/commands/cluster.py b/redis/commands/cluster.py index 67a1e4104d..4c64269ea9 100644 --- a/redis/commands/cluster.py +++ b/redis/commands/cluster.py @@ -45,7 +45,6 @@ if TYPE_CHECKING: from redis.asyncio.cluster import TargetNodesT - # Not complete, but covers the major ones # https://redis.io/commands READ_COMMANDS = frozenset( @@ -111,7 +110,7 @@ def _partition_keys_by_slot(self, keys: Iterable[KeyT]) -> Dict[int, List[KeyT]] return slots_to_keys def _partition_pairs_by_slot( - self, mapping: Mapping[AnyKeyT, EncodableT] + self, mapping: Mapping[AnyKeyT, EncodableT] ) -> Dict[int, List[EncodableT]]: """Split pairs into a dictionary that maps a slot to a list of pairs.""" @@ -123,7 +122,7 @@ def _partition_pairs_by_slot( return slots_to_pairs def _execute_pipeline_by_slot( - self, command: str, slots_to_args: Mapping[int, Iterable[EncodableT]] + self, command: str, slots_to_args: Mapping[int, Iterable[EncodableT]] ) -> List[Any]: read_from_replicas = self.read_from_replicas and command in READ_COMMANDS pipe = self.pipeline() @@ -140,10 +139,10 @@ def _execute_pipeline_by_slot( return pipe.execute() def _reorder_keys_by_command( - self, - keys: Iterable[KeyT], - slots_to_args: Mapping[int, Iterable[EncodableT]], - responses: Iterable[Any], + self, + keys: Iterable[KeyT], + slots_to_args: Mapping[int, Iterable[EncodableT]], + responses: Iterable[Any], ) -> List[Any]: results = { k: v @@ -319,7 +318,7 @@ async def _split_command_across_slots(self, command: str, *keys: KeyT) -> int: return sum(await self._execute_pipeline_by_slot(command, slots_to_keys)) async def _execute_pipeline_by_slot( - self, command: str, slots_to_args: Mapping[int, Iterable[EncodableT]] + self, command: str, slots_to_args: Mapping[int, Iterable[EncodableT]] ) -> List[Any]: if self._initialize: await self.initialize() @@ -382,7 +381,7 @@ def cluster_myid(self, target_node: "TargetNodesT") -> ResponseT: return self.execute_command("CLUSTER MYID", target_nodes=target_node) def cluster_addslots( - self, target_node: "TargetNodesT", *slots: EncodableT + self, target_node: "TargetNodesT", *slots: EncodableT ) -> ResponseT: """ Assign new hash slots to receiving node. Sends to specified node. @@ -397,7 +396,7 @@ def cluster_addslots( ) def cluster_addslotsrange( - self, target_node: "TargetNodesT", *slots: EncodableT + self, target_node: "TargetNodesT", *slots: EncodableT ) -> ResponseT: """ Similar to the CLUSTER ADDSLOTS command. @@ -455,7 +454,7 @@ def cluster_delslotsrange(self, *slots: EncodableT) -> ResponseT: return self.execute_command("CLUSTER DELSLOTSRANGE", *slots) def cluster_failover( - self, target_node: "TargetNodesT", option: Optional[str] = None + self, target_node: "TargetNodesT", option: Optional[str] = None ) -> ResponseT: """ Forces a slave to perform a manual failover of its master @@ -498,7 +497,7 @@ def cluster_keyslot(self, key: str) -> ResponseT: return self.execute_command("CLUSTER KEYSLOT", key) def cluster_meet( - self, host: str, port: int, target_nodes: Optional["TargetNodesT"] = None + self, host: str, port: int, target_nodes: Optional["TargetNodesT"] = None ) -> ResponseT: """ Force a node cluster to handshake with another node. @@ -520,7 +519,7 @@ def cluster_nodes(self) -> ResponseT: return self.execute_command("CLUSTER NODES") def cluster_replicate( - self, target_nodes: "TargetNodesT", node_id: str + self, target_nodes: "TargetNodesT", node_id: str ) -> ResponseT: """ Reconfigure a node as a slave of the specified master node @@ -532,7 +531,7 @@ def cluster_replicate( ) def cluster_reset( - self, soft: bool = True, target_nodes: Optional["TargetNodesT"] = None + self, soft: bool = True, target_nodes: Optional["TargetNodesT"] = None ) -> ResponseT: """ Reset a Redis Cluster node @@ -547,7 +546,7 @@ def cluster_reset( ) def cluster_save_config( - self, target_nodes: Optional["TargetNodesT"] = None + self, target_nodes: Optional["TargetNodesT"] = None ) -> ResponseT: """ Forces the node to save cluster state on disk @@ -565,7 +564,7 @@ def cluster_get_keys_in_slot(self, slot: int, num_keys: int) -> ResponseT: return self.execute_command("CLUSTER GETKEYSINSLOT", slot, num_keys) def cluster_set_config_epoch( - self, epoch: int, target_nodes: Optional["TargetNodesT"] = None + self, epoch: int, target_nodes: Optional["TargetNodesT"] = None ) -> ResponseT: """ Set the configuration epoch in a new node @@ -577,7 +576,7 @@ def cluster_set_config_epoch( ) def cluster_setslot( - self, target_node: "TargetNodesT", node_id: str, slot_id: int, state: str + self, target_node: "TargetNodesT", node_id: str, slot_id: int, state: str ) -> ResponseT: """ Bind an hash slot to a specific node @@ -606,7 +605,7 @@ def cluster_setslot_stable(self, slot_id: int) -> ResponseT: return self.execute_command("CLUSTER SETSLOT", slot_id, "STABLE") def cluster_replicas( - self, node_id: str, target_nodes: Optional["TargetNodesT"] = None + self, node_id: str, target_nodes: Optional["TargetNodesT"] = None ) -> ResponseT: """ Provides a list of replica nodes replicating from the specified primary @@ -633,7 +632,7 @@ def cluster_shards(self, target_nodes=None): For more information see https://redis.io/commands/cluster-shards """ return self.execute_command("CLUSTER SHARDS", target_nodes=target_nodes) - + def cluster_myshardid(self, target_nodes=None): """ Returns details about the shards of the cluster. @@ -727,16 +726,16 @@ class ClusterDataAccessCommands(DataAccessCommands): """ def stralgo( - self, - algo: Literal["LCS"], - value1: KeyT, - value2: KeyT, - specific_argument: Union[Literal["strings"], Literal["keys"]] = "strings", - len: bool = False, - idx: bool = False, - minmatchlen: Optional[int] = None, - withmatchlen: bool = False, - **kwargs, + self, + algo: Literal["LCS"], + value1: KeyT, + value2: KeyT, + specific_argument: Union[Literal["strings"], Literal["keys"]] = "strings", + len: bool = False, + idx: bool = False, + minmatchlen: Optional[int] = None, + withmatchlen: bool = False, + **kwargs, ) -> ResponseT: """ Implements complex algorithms that operate on strings. @@ -774,11 +773,11 @@ def stralgo( ) def scan_iter( - self, - match: Optional[PatternT] = None, - count: Optional[int] = None, - _type: Optional[str] = None, - **kwargs, + self, + match: Optional[PatternT] = None, + count: Optional[int] = None, + _type: Optional[str] = None, + **kwargs, ) -> Iterator: # Do the first query with cursor=0 for all nodes cursors, data = self.scan(match=match, count=count, _type=_type, **kwargs) @@ -820,11 +819,11 @@ class AsyncClusterDataAccessCommands( """ async def scan_iter( - self, - match: Optional[PatternT] = None, - count: Optional[int] = None, - _type: Optional[str] = None, - **kwargs, + self, + match: Optional[PatternT] = None, + count: Optional[int] = None, + _type: Optional[str] = None, + **kwargs, ) -> AsyncIterator: # Do the first query with cursor=0 for all nodes cursors, data = await self.scan(match=match, count=count, _type=_type, **kwargs) diff --git a/tests/test_cluster.py b/tests/test_cluster.py index 289ec3bfe7..2ecb2cac87 100644 --- a/tests/test_cluster.py +++ b/tests/test_cluster.py @@ -107,7 +107,7 @@ def execute_command(*_args, **_kwargs): execute_command_mock.side_effect = execute_command with patch.object( - CommandsParser, "initialize", autospec=True + CommandsParser, "initialize", autospec=True ) as cmd_parser_initialize: def cmd_init_mock(self, r): @@ -233,8 +233,8 @@ def test_startup_nodes(self): ] cluster = get_mocked_redis_client(startup_nodes=startup_nodes) assert ( - cluster.get_node(host=default_host, port=port_1) is not None - and cluster.get_node(host=default_host, port=port_2) is not None + cluster.get_node(host=default_host, port=port_1) is not None + and cluster.get_node(host=default_host, port=port_2) is not None ) def test_empty_startup_nodes(self): @@ -251,7 +251,6 @@ def test_empty_startup_nodes(self): def test_from_url(self, r): redis_url = f"redis://{default_host}:{default_port}/0" with patch.object(RedisCluster, "from_url") as from_url: - def from_url_mocked(_url, **_kwargs): return get_mocked_redis_client(url=_url, **_kwargs) @@ -346,7 +345,6 @@ def test_ask_redirection(self, r): """ redirect_node = r.get_nodes()[0] with patch.object(Redis, "parse_response") as parse_response: - def ask_redirect_effect(connection, *args, **options): def ok_response(connection, *args, **options): assert connection.host == redirect_node.host @@ -437,7 +435,7 @@ def test_refresh_using_specific_nodes(self, request): with patch.object(Redis, "parse_response") as parse_response: with patch.object(NodesManager, "initialize", autospec=True) as initialize: with patch.multiple( - Connection, send_command=DEFAULT, connect=DEFAULT, can_read=DEFAULT + Connection, send_command=DEFAULT, connect=DEFAULT, can_read=DEFAULT ) as mocks: # simulate 7006 as a failed node def parse_response_mock(connection, command_name, **options): @@ -481,7 +479,7 @@ def map_7007(self): mocks["send_command"].return_value = "MOCK_OK" mocks["connect"].return_value = None with patch.object( - CommandsParser, "initialize", autospec=True + CommandsParser, "initialize", autospec=True ) as cmd_parser_initialize: def cmd_init_mock(self, r): @@ -514,15 +512,14 @@ def cmd_init_mock(self, r): def test_reading_from_replicas_in_round_robin(self): with patch.multiple( - Connection, - send_command=DEFAULT, - read_response=DEFAULT, - _connect=DEFAULT, - can_read=DEFAULT, - on_connect=DEFAULT, + Connection, + send_command=DEFAULT, + read_response=DEFAULT, + _connect=DEFAULT, + can_read=DEFAULT, + on_connect=DEFAULT, ) as mocks: with patch.object(Redis, "parse_response") as parse_response: - def parse_response_mock_first(connection, *args, **options): # Primary assert connection.port == 7001 @@ -591,8 +588,8 @@ def test_keyslot(self, r): def test_get_node_name(self): assert ( - get_node_name(default_host, default_port) - == f"{default_host}:{default_port}" + get_node_name(default_host, default_port) + == f"{default_host}:{default_port}" ) def test_all_nodes(self, r): @@ -626,7 +623,6 @@ def test_cluster_down_overreaches_retry_attempts(self, error): and then raise the exception """ with patch.object(RedisCluster, "_execute_command") as execute_command: - def raise_error(target_node, *args, **kwargs): execute_command.failed_calls += 1 raise error("mocked error") @@ -794,10 +790,10 @@ def test_cluster_retry_object(self, r) -> None: retry = Retry(ExponentialBackoff(10, 5), 5) rc_custom_retry = RedisCluster("127.0.0.1", 16379, retry=retry) assert ( - rc_custom_retry.get_node("127.0.0.1", 16379) - .redis_connection.get_retry() - ._retries - == retry._retries + rc_custom_retry.get_node("127.0.0.1", 16379) + .redis_connection.get_retry() + ._retries + == retry._retries ) def test_replace_cluster_node(self, r) -> None: @@ -832,8 +828,8 @@ class TestClusterRedisCommands: def test_case_insensitive_command_names(self, r): assert ( - r.cluster_response_callbacks["cluster slots"] - == r.cluster_response_callbacks["CLUSTER SLOTS"] + r.cluster_response_callbacks["cluster slots"] + == r.cluster_response_callbacks["CLUSTER SLOTS"] ) def test_get_and_set(self, r): @@ -1167,8 +1163,8 @@ def test_cluster_nodes(self, r): assert len(nodes) == 7 assert nodes.get("172.17.0.7:7006") is not None assert ( - nodes.get("172.17.0.7:7006").get("node_id") - == "c8253bae761cb1ecb2b61857d85dfe455a0fec8b" + nodes.get("172.17.0.7:7006").get("node_id") + == "c8253bae761cb1ecb2b61857d85dfe455a0fec8b" ) @skip_if_redis_enterprise() @@ -1291,8 +1287,8 @@ def test_cluster_replicas(self, r): assert replicas.get("127.0.0.1:6377") is not None assert replicas.get("127.0.0.1:6378") is not None assert ( - replicas.get("127.0.0.1:6378").get("node_id") - == "r4xfga22229cf3c652b6fca0d09ff69f3e0d4d" + replicas.get("127.0.0.1:6378").get("node_id") + == "r4xfga22229cf3c652b6fca0d09ff69f3e0d4d" ) @skip_if_server_version_lt("7.0.0") @@ -1361,8 +1357,8 @@ def _init_slowlog_test(self, r, node): def _teardown_slowlog_test(self, r, node, prev_limit): assert ( - r.config_set("slowlog-log-slower-than", prev_limit, target_nodes=node) - is True + r.config_set("slowlog-log-slower-than", prev_limit, target_nodes=node) + is True ) def test_slowlog_get(self, r, slowlog): @@ -1781,8 +1777,8 @@ def test_cluster_zinterstore_max(self, r): r.zadd("{foo}b", {"a1": 2, "a2": 2, "a3": 2}) r.zadd("{foo}c", {"a1": 6, "a3": 5, "a4": 4}) assert ( - r.zinterstore("{foo}d", ["{foo}a", "{foo}b", "{foo}c"], aggregate="MAX") - == 2 + r.zinterstore("{foo}d", ["{foo}a", "{foo}b", "{foo}c"], aggregate="MAX") + == 2 ) assert r.zrange("{foo}d", 0, -1, withscores=True) == [(b"a3", 5), (b"a1", 6)] @@ -1791,8 +1787,8 @@ def test_cluster_zinterstore_min(self, r): r.zadd("{foo}b", {"a1": 2, "a2": 3, "a3": 5}) r.zadd("{foo}c", {"a1": 6, "a3": 5, "a4": 4}) assert ( - r.zinterstore("{foo}d", ["{foo}a", "{foo}b", "{foo}c"], aggregate="MIN") - == 2 + r.zinterstore("{foo}d", ["{foo}a", "{foo}b", "{foo}c"], aggregate="MIN") + == 2 ) assert r.zrange("{foo}d", 0, -1, withscores=True) == [(b"a1", 1), (b"a3", 3)] @@ -1895,8 +1891,8 @@ def test_cluster_zunionstore_max(self, r): r.zadd("{foo}b", {"a1": 2, "a2": 2, "a3": 2}) r.zadd("{foo}c", {"a1": 6, "a3": 5, "a4": 4}) assert ( - r.zunionstore("{foo}d", ["{foo}a", "{foo}b", "{foo}c"], aggregate="MAX") - == 4 + r.zunionstore("{foo}d", ["{foo}a", "{foo}b", "{foo}c"], aggregate="MAX") + == 4 ) assert r.zrange("{foo}d", 0, -1, withscores=True) == [ (b"a2", 2), @@ -1910,8 +1906,8 @@ def test_cluster_zunionstore_min(self, r): r.zadd("{foo}b", {"a1": 2, "a2": 2, "a3": 4}) r.zadd("{foo}c", {"a1": 6, "a3": 5, "a4": 4}) assert ( - r.zunionstore("{foo}d", ["{foo}a", "{foo}b", "{foo}c"], aggregate="MIN") - == 4 + r.zunionstore("{foo}d", ["{foo}a", "{foo}b", "{foo}c"], aggregate="MIN") + == 4 ) assert r.zrange("{foo}d", 0, -1, withscores=True) == [ (b"a1", 1), @@ -2042,8 +2038,8 @@ def test_cluster_keys(self, r): for key in keys: r[key] = 1 assert ( - set(r.keys(pattern="test_*", target_nodes="primaries")) - == keys_with_underscores + set(r.keys(pattern="test_*", target_nodes="primaries")) + == keys_with_underscores ) assert set(r.keys(pattern="test*", target_nodes="primaries")) == keys @@ -2055,7 +2051,7 @@ def test_cluster_scan(self, r): r.set("c", 3) for target_nodes, nodes in zip( - ["primaries", "replicas"], [r.get_primaries(), r.get_replicas()] + ["primaries", "replicas"], [r.get_primaries(), r.get_replicas()] ): cursors, keys = r.scan(target_nodes=target_nodes) assert sorted(keys) == [b"a", b"b", b"c"] @@ -2076,7 +2072,7 @@ def test_cluster_scan_type(self, r): r.lpush("a-list", "aux", 3) for target_nodes, nodes in zip( - ["primaries", "replicas"], [r.get_primaries(), r.get_replicas()] + ["primaries", "replicas"], [r.get_primaries(), r.get_replicas()] ): cursors, keys = r.scan(_type="SET", target_nodes=target_nodes) assert sorted(keys) == [b"a-set", b"b-set", b"c-set"] @@ -2461,7 +2457,7 @@ def execute_command(*args, **kwargs): assert "Redis Cluster cannot be connected" in str(e.value) with patch.object( - CommandsParser, "initialize", autospec=True + CommandsParser, "initialize", autospec=True ) as cmd_parser_initialize: def cmd_init_mock(self, r): @@ -2652,15 +2648,15 @@ def test_blocked_arguments(self, r): r.pipeline(transaction=True) assert ( - str(ex.value).startswith("transaction is deprecated in cluster mode") - is True + str(ex.value).startswith("transaction is deprecated in cluster mode") + is True ) with pytest.raises(RedisClusterException) as ex: r.pipeline(shard_hint=True) assert ( - str(ex.value).startswith("shard_hint is deprecated in cluster mode") is True + str(ex.value).startswith("shard_hint is deprecated in cluster mode") is True ) def test_redis_cluster_pipeline(self, r): From b2b2efbb270e8d58febc5b05c9bb4c780f645e36 Mon Sep 17 00:00:00 2001 From: Anuragkillswitch <70265851+Anuragkillswitch@users.noreply.github.com> Date: Mon, 1 May 2023 14:48:30 +0530 Subject: [PATCH 3/6] fix: comment fix and async test --- redis/commands/cluster.py | 4 ++-- tests/test_asyncio/test_cluster.py | 6 ++++++ tests/test_cluster.py | 5 +++-- 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/redis/commands/cluster.py b/redis/commands/cluster.py index 4c64269ea9..586e5ceccb 100644 --- a/redis/commands/cluster.py +++ b/redis/commands/cluster.py @@ -635,9 +635,9 @@ def cluster_shards(self, target_nodes=None): def cluster_myshardid(self, target_nodes=None): """ - Returns details about the shards of the cluster. + Returns the shard ID of the node. - For more information see https://redis.io/commands/cluster-shards + For more information see https://redis.io/commands/cluster-myshardid/ """ return self.execute_command("CLUSTER MYSHARDID", target_nodes=target_nodes) diff --git a/tests/test_asyncio/test_cluster.py b/tests/test_asyncio/test_cluster.py index 13e5e26ae3..e73bfa0cc4 100644 --- a/tests/test_asyncio/test_cluster.py +++ b/tests/test_asyncio/test_cluster.py @@ -910,6 +910,12 @@ async def test_cluster_myid(self, r: RedisCluster) -> None: myid = await r.cluster_myid(node) assert len(myid) == 40 + @skip_if_redis_enterprise() + async def test_cluster_myshardid(self, r: RedisCluster) -> None: + node = r.get_random_node() + myshardid = await r.cluster_myshardid(node) + assert len(myshardid) == 40 + @skip_if_redis_enterprise() async def test_cluster_slots(self, r: RedisCluster) -> None: mock_all_nodes_resp(r, default_cluster_slots) diff --git a/tests/test_cluster.py b/tests/test_cluster.py index 2ecb2cac87..6874b7c7ae 100644 --- a/tests/test_cluster.py +++ b/tests/test_cluster.py @@ -435,7 +435,7 @@ def test_refresh_using_specific_nodes(self, request): with patch.object(Redis, "parse_response") as parse_response: with patch.object(NodesManager, "initialize", autospec=True) as initialize: with patch.multiple( - Connection, send_command=DEFAULT, connect=DEFAULT, can_read=DEFAULT + Connection, send_command=DEFAULT, connect=DEFAULT, can_read=DEFAULT ) as mocks: # simulate 7006 as a failed node def parse_response_mock(connection, command_name, **options): @@ -1967,6 +1967,7 @@ def test_cluster_geosearchstore(self, r): r.geoadd("{foo}barcelona", values) r.geosearchstore( + "{foo}places_barcelona", "{foo}barcelona", longitude=2.191, @@ -2656,7 +2657,7 @@ def test_blocked_arguments(self, r): r.pipeline(shard_hint=True) assert ( - str(ex.value).startswith("shard_hint is deprecated in cluster mode") is True + str(ex.value).startswith("shard_hint is deprecated in cluster mode") is True ) def test_redis_cluster_pipeline(self, r): From 9ec7cc8851c40fee8c68d59862336e9f040c5253 Mon Sep 17 00:00:00 2001 From: Anuragkillswitch <70265851+Anuragkillswitch@users.noreply.github.com> Date: Mon, 1 May 2023 19:03:01 +0530 Subject: [PATCH 4/6] fix: adding version check --- tests/test_asyncio/test_cluster.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_asyncio/test_cluster.py b/tests/test_asyncio/test_cluster.py index e73bfa0cc4..98f4ef51be 100644 --- a/tests/test_asyncio/test_cluster.py +++ b/tests/test_asyncio/test_cluster.py @@ -910,6 +910,7 @@ async def test_cluster_myid(self, r: RedisCluster) -> None: myid = await r.cluster_myid(node) assert len(myid) == 40 + @skip_if_server_version_lt("7.2.0") @skip_if_redis_enterprise() async def test_cluster_myshardid(self, r: RedisCluster) -> None: node = r.get_random_node() From 171c1e3615854e72154f0854959d3199c39c43ac Mon Sep 17 00:00:00 2001 From: Anuragkillswitch <70265851+Anuragkillswitch@users.noreply.github.com> Date: Mon, 1 May 2023 19:33:30 +0530 Subject: [PATCH 5/6] fix lint: --- redis/commands/cluster.py | 75 ++++++++++++++++++++------------------- 1 file changed, 38 insertions(+), 37 deletions(-) diff --git a/redis/commands/cluster.py b/redis/commands/cluster.py index 586e5ceccb..fc4e189009 100644 --- a/redis/commands/cluster.py +++ b/redis/commands/cluster.py @@ -110,7 +110,7 @@ def _partition_keys_by_slot(self, keys: Iterable[KeyT]) -> Dict[int, List[KeyT]] return slots_to_keys def _partition_pairs_by_slot( - self, mapping: Mapping[AnyKeyT, EncodableT] + self, mapping: Mapping[AnyKeyT, EncodableT] ) -> Dict[int, List[EncodableT]]: """Split pairs into a dictionary that maps a slot to a list of pairs.""" @@ -122,11 +122,12 @@ def _partition_pairs_by_slot( return slots_to_pairs def _execute_pipeline_by_slot( - self, command: str, slots_to_args: Mapping[int, Iterable[EncodableT]] + self, command: str, slots_to_args: Mapping[int, Iterable[EncodableT]] ) -> List[Any]: read_from_replicas = self.read_from_replicas and command in READ_COMMANDS pipe = self.pipeline() [ + pipe.execute_command( pipe.execute_command( command, *slot_args, @@ -139,10 +140,10 @@ def _execute_pipeline_by_slot( return pipe.execute() def _reorder_keys_by_command( - self, - keys: Iterable[KeyT], - slots_to_args: Mapping[int, Iterable[EncodableT]], - responses: Iterable[Any], + self, + keys: Iterable[KeyT], + slots_to_args: Mapping[int, Iterable[EncodableT]], + responses: Iterable[Any], ) -> List[Any]: results = { k: v @@ -318,7 +319,7 @@ async def _split_command_across_slots(self, command: str, *keys: KeyT) -> int: return sum(await self._execute_pipeline_by_slot(command, slots_to_keys)) async def _execute_pipeline_by_slot( - self, command: str, slots_to_args: Mapping[int, Iterable[EncodableT]] + self, command: str, slots_to_args: Mapping[int, Iterable[EncodableT]] ) -> List[Any]: if self._initialize: await self.initialize() @@ -381,7 +382,7 @@ def cluster_myid(self, target_node: "TargetNodesT") -> ResponseT: return self.execute_command("CLUSTER MYID", target_nodes=target_node) def cluster_addslots( - self, target_node: "TargetNodesT", *slots: EncodableT + self, target_node: "TargetNodesT", *slots: EncodableT ) -> ResponseT: """ Assign new hash slots to receiving node. Sends to specified node. @@ -396,7 +397,7 @@ def cluster_addslots( ) def cluster_addslotsrange( - self, target_node: "TargetNodesT", *slots: EncodableT + self, target_node: "TargetNodesT", *slots: EncodableT ) -> ResponseT: """ Similar to the CLUSTER ADDSLOTS command. @@ -454,7 +455,7 @@ def cluster_delslotsrange(self, *slots: EncodableT) -> ResponseT: return self.execute_command("CLUSTER DELSLOTSRANGE", *slots) def cluster_failover( - self, target_node: "TargetNodesT", option: Optional[str] = None + self, target_node: "TargetNodesT", option: Optional[str] = None ) -> ResponseT: """ Forces a slave to perform a manual failover of its master @@ -497,7 +498,7 @@ def cluster_keyslot(self, key: str) -> ResponseT: return self.execute_command("CLUSTER KEYSLOT", key) def cluster_meet( - self, host: str, port: int, target_nodes: Optional["TargetNodesT"] = None + self, host: str, port: int, target_nodes: Optional["TargetNodesT"] = None ) -> ResponseT: """ Force a node cluster to handshake with another node. @@ -519,7 +520,7 @@ def cluster_nodes(self) -> ResponseT: return self.execute_command("CLUSTER NODES") def cluster_replicate( - self, target_nodes: "TargetNodesT", node_id: str + self, target_nodes: "TargetNodesT", node_id: str ) -> ResponseT: """ Reconfigure a node as a slave of the specified master node @@ -531,7 +532,7 @@ def cluster_replicate( ) def cluster_reset( - self, soft: bool = True, target_nodes: Optional["TargetNodesT"] = None + self, soft: bool = True, target_nodes: Optional["TargetNodesT"] = None ) -> ResponseT: """ Reset a Redis Cluster node @@ -546,7 +547,7 @@ def cluster_reset( ) def cluster_save_config( - self, target_nodes: Optional["TargetNodesT"] = None + self, target_nodes: Optional["TargetNodesT"] = None ) -> ResponseT: """ Forces the node to save cluster state on disk @@ -564,7 +565,7 @@ def cluster_get_keys_in_slot(self, slot: int, num_keys: int) -> ResponseT: return self.execute_command("CLUSTER GETKEYSINSLOT", slot, num_keys) def cluster_set_config_epoch( - self, epoch: int, target_nodes: Optional["TargetNodesT"] = None + self, epoch: int, target_nodes: Optional["TargetNodesT"] = None ) -> ResponseT: """ Set the configuration epoch in a new node @@ -576,7 +577,7 @@ def cluster_set_config_epoch( ) def cluster_setslot( - self, target_node: "TargetNodesT", node_id: str, slot_id: int, state: str + self, target_node: "TargetNodesT", node_id: str, slot_id: int, state: str ) -> ResponseT: """ Bind an hash slot to a specific node @@ -605,7 +606,7 @@ def cluster_setslot_stable(self, slot_id: int) -> ResponseT: return self.execute_command("CLUSTER SETSLOT", slot_id, "STABLE") def cluster_replicas( - self, node_id: str, target_nodes: Optional["TargetNodesT"] = None + self, node_id: str, target_nodes: Optional["TargetNodesT"] = None ) -> ResponseT: """ Provides a list of replica nodes replicating from the specified primary @@ -726,16 +727,16 @@ class ClusterDataAccessCommands(DataAccessCommands): """ def stralgo( - self, - algo: Literal["LCS"], - value1: KeyT, - value2: KeyT, - specific_argument: Union[Literal["strings"], Literal["keys"]] = "strings", - len: bool = False, - idx: bool = False, - minmatchlen: Optional[int] = None, - withmatchlen: bool = False, - **kwargs, + self, + algo: Literal["LCS"], + value1: KeyT, + value2: KeyT, + specific_argument: Union[Literal["strings"], Literal["keys"]] = "strings", + len: bool = False, + idx: bool = False, + minmatchlen: Optional[int] = None, + withmatchlen: bool = False, + **kwargs, ) -> ResponseT: """ Implements complex algorithms that operate on strings. @@ -773,11 +774,11 @@ def stralgo( ) def scan_iter( - self, - match: Optional[PatternT] = None, - count: Optional[int] = None, - _type: Optional[str] = None, - **kwargs, + self, + match: Optional[PatternT] = None, + count: Optional[int] = None, + _type: Optional[str] = None, + **kwargs, ) -> Iterator: # Do the first query with cursor=0 for all nodes cursors, data = self.scan(match=match, count=count, _type=_type, **kwargs) @@ -819,11 +820,11 @@ class AsyncClusterDataAccessCommands( """ async def scan_iter( - self, - match: Optional[PatternT] = None, - count: Optional[int] = None, - _type: Optional[str] = None, - **kwargs, + self, + match: Optional[PatternT] = None, + count: Optional[int] = None, + _type: Optional[str] = None, + **kwargs, ) -> AsyncIterator: # Do the first query with cursor=0 for all nodes cursors, data = await self.scan(match=match, count=count, _type=_type, **kwargs) From 121881ee58af838b3885882e8ea10d9f765362bf Mon Sep 17 00:00:00 2001 From: dvora-h Date: Mon, 8 May 2023 10:09:13 +0300 Subject: [PATCH 6/6] linters --- redis/cluster.py | 68 ++++++++++++++++---------------- redis/commands/cluster.py | 13 +++--- tests/test_cluster.py | 83 ++++++++++++++++++++------------------- 3 files changed, 83 insertions(+), 81 deletions(-) diff --git a/redis/cluster.py b/redis/cluster.py index 970da86a72..2ab173ded9 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -74,7 +74,7 @@ def parse_pubsub_numsub(command, res, **options): def parse_cluster_slots( - resp: Any, **options: Any + resp: Any, **options: Any ) -> Dict[Tuple[int, int], Dict[str, Any]]: current_host = options.get("current_host", "") @@ -117,7 +117,7 @@ def parse_cluster_myshardid(resp, **options): """ Parse CLUSTER MYSHARDID response. """ - return resp.decode('utf-8') + return resp.decode("utf-8") PRIMARY = "primary" @@ -348,7 +348,7 @@ class AbstractRedisCluster: CLUSTER_COMMANDS_RESPONSE_CALLBACKS = { "CLUSTER SLOTS": parse_cluster_slots, "CLUSTER SHARDS": parse_cluster_shards, - "CLUSTER MYSHARDID": parse_cluster_myshardid + "CLUSTER MYSHARDID": parse_cluster_myshardid, } RESULT_CALLBACKS = dict_merge( @@ -778,14 +778,14 @@ def pipeline(self, transaction=None, shard_hint=None): ) def lock( - self, - name, - timeout=None, - sleep=0.1, - blocking=True, - blocking_timeout=None, - lock_class=None, - thread_local=True, + self, + name, + timeout=None, + sleep=0.1, + blocking=True, + blocking_timeout=None, + lock_class=None, + thread_local=True, ): """ Return a new Lock object using key ``name`` that mimics @@ -951,7 +951,7 @@ def determine_slot(self, *args): if len(args) <= 2: raise RedisClusterException(f"Invalid args in command: {args}") num_actual_keys = args[2] - eval_keys = args[3: 3 + num_actual_keys] + eval_keys = args[3 : 3 + num_actual_keys] # if there are 0 keys, that means the script can be run on any node # so we can just return a random slot if len(eval_keys) == 0: @@ -1068,8 +1068,8 @@ def execute_command(self, *args, **kwargs): f"No targets were found to execute {args} command on" ) if ( - len(target_nodes) == 1 - and target_nodes[0] == self.get_default_node() + len(target_nodes) == 1 + and target_nodes[0] == self.get_default_node() ): is_default_node = True for node in target_nodes: @@ -1386,9 +1386,9 @@ def get_node_from_slot(self, slot, read_from_replicas=False, server_type=None): primary_name, len(self.slots_cache[slot]) ) elif ( - server_type is None - or server_type == PRIMARY - or len(self.slots_cache[slot]) == 1 + server_type is None + or server_type == PRIMARY + or len(self.slots_cache[slot]) == 1 ): # return a primary node_idx = 0 @@ -1508,9 +1508,9 @@ def initialize(self): # If there's only one server in the cluster, its ``host`` is '' # Fix it to the host in startup_nodes if ( - len(cluster_slots) == 1 - and len(cluster_slots[0][2][0]) == 0 - and len(self.startup_nodes) == 1 + len(cluster_slots) == 1 + and len(cluster_slots[0][2][0]) == 0 + and len(self.startup_nodes) == 1 ): cluster_slots[0][2][0] = startup_node.host @@ -1764,17 +1764,17 @@ class ClusterPipeline(RedisCluster): ) def __init__( - self, - nodes_manager: "NodesManager", - commands_parser: "CommandsParser", - result_callbacks: Optional[Dict[str, Callable]] = None, - cluster_response_callbacks: Optional[Dict[str, Callable]] = None, - startup_nodes: Optional[List["ClusterNode"]] = None, - read_from_replicas: bool = False, - cluster_error_retry_attempts: int = 3, - reinitialize_steps: int = 5, - lock=None, - **kwargs, + self, + nodes_manager: "NodesManager", + commands_parser: "CommandsParser", + result_callbacks: Optional[Dict[str, Callable]] = None, + cluster_response_callbacks: Optional[Dict[str, Callable]] = None, + startup_nodes: Optional[List["ClusterNode"]] = None, + read_from_replicas: bool = False, + cluster_error_retry_attempts: int = 3, + reinitialize_steps: int = 5, + lock=None, + **kwargs, ): """ """ self.command_stack = [] @@ -1782,7 +1782,7 @@ def __init__( self.commands_parser = commands_parser self.refresh_table_asap = False self.result_callbacks = ( - result_callbacks or self.__class__.RESULT_CALLBACKS.copy() + result_callbacks or self.__class__.RESULT_CALLBACKS.copy() ) self.startup_nodes = startup_nodes if startup_nodes else [] self.read_from_replicas = read_from_replicas @@ -1905,7 +1905,7 @@ def reset(self): # self.connection = None def send_cluster_commands( - self, stack, raise_on_error=True, allow_redirections=True + self, stack, raise_on_error=True, allow_redirections=True ): """ Wrapper for CLUSTERDOWN error handling. @@ -1942,7 +1942,7 @@ def send_cluster_commands( raise e def _send_cluster_commands( - self, stack, raise_on_error=True, allow_redirections=True + self, stack, raise_on_error=True, allow_redirections=True ): """ Send a bunch of cluster commands to the redis cluster. diff --git a/redis/commands/cluster.py b/redis/commands/cluster.py index fc4e189009..cd93a85aba 100644 --- a/redis/commands/cluster.py +++ b/redis/commands/cluster.py @@ -127,7 +127,6 @@ def _execute_pipeline_by_slot( read_from_replicas = self.read_from_replicas and command in READ_COMMANDS pipe = self.pipeline() [ - pipe.execute_command( pipe.execute_command( command, *slot_args, @@ -319,7 +318,7 @@ async def _split_command_across_slots(self, command: str, *keys: KeyT) -> int: return sum(await self._execute_pipeline_by_slot(command, slots_to_keys)) async def _execute_pipeline_by_slot( - self, command: str, slots_to_args: Mapping[int, Iterable[EncodableT]] + self, command: str, slots_to_args: Mapping[int, Iterable[EncodableT]] ) -> List[Any]: if self._initialize: await self.initialize() @@ -820,11 +819,11 @@ class AsyncClusterDataAccessCommands( """ async def scan_iter( - self, - match: Optional[PatternT] = None, - count: Optional[int] = None, - _type: Optional[str] = None, - **kwargs, + self, + match: Optional[PatternT] = None, + count: Optional[int] = None, + _type: Optional[str] = None, + **kwargs, ) -> AsyncIterator: # Do the first query with cursor=0 for all nodes cursors, data = await self.scan(match=match, count=count, _type=_type, **kwargs) diff --git a/tests/test_cluster.py b/tests/test_cluster.py index 9e2b988022..705e753bd6 100644 --- a/tests/test_cluster.py +++ b/tests/test_cluster.py @@ -178,7 +178,7 @@ def execute_command(*_args, **_kwargs): execute_command_mock.side_effect = execute_command with patch.object( - CommandsParser, "initialize", autospec=True + CommandsParser, "initialize", autospec=True ) as cmd_parser_initialize: def cmd_init_mock(self, r): @@ -304,8 +304,8 @@ def test_startup_nodes(self): ] cluster = get_mocked_redis_client(startup_nodes=startup_nodes) assert ( - cluster.get_node(host=default_host, port=port_1) is not None - and cluster.get_node(host=default_host, port=port_2) is not None + cluster.get_node(host=default_host, port=port_1) is not None + and cluster.get_node(host=default_host, port=port_2) is not None ) def test_empty_startup_nodes(self): @@ -322,6 +322,7 @@ def test_empty_startup_nodes(self): def test_from_url(self, r): redis_url = f"redis://{default_host}:{default_port}/0" with patch.object(RedisCluster, "from_url") as from_url: + def from_url_mocked(_url, **_kwargs): return get_mocked_redis_client(url=_url, **_kwargs) @@ -416,6 +417,7 @@ def test_ask_redirection(self, r): """ redirect_node = r.get_nodes()[0] with patch.object(Redis, "parse_response") as parse_response: + def ask_redirect_effect(connection, *args, **options): def ok_response(connection, *args, **options): assert connection.host == redirect_node.host @@ -550,7 +552,7 @@ def map_7007(self): mocks["send_command"].return_value = "MOCK_OK" mocks["connect"].return_value = None with patch.object( - CommandsParser, "initialize", autospec=True + CommandsParser, "initialize", autospec=True ) as cmd_parser_initialize: def cmd_init_mock(self, r): @@ -583,14 +585,15 @@ def cmd_init_mock(self, r): def test_reading_from_replicas_in_round_robin(self): with patch.multiple( - Connection, - send_command=DEFAULT, - read_response=DEFAULT, - _connect=DEFAULT, - can_read=DEFAULT, - on_connect=DEFAULT, + Connection, + send_command=DEFAULT, + read_response=DEFAULT, + _connect=DEFAULT, + can_read=DEFAULT, + on_connect=DEFAULT, ) as mocks: with patch.object(Redis, "parse_response") as parse_response: + def parse_response_mock_first(connection, *args, **options): # Primary assert connection.port == 7001 @@ -659,8 +662,8 @@ def test_keyslot(self, r): def test_get_node_name(self): assert ( - get_node_name(default_host, default_port) - == f"{default_host}:{default_port}" + get_node_name(default_host, default_port) + == f"{default_host}:{default_port}" ) def test_all_nodes(self, r): @@ -694,6 +697,7 @@ def test_cluster_down_overreaches_retry_attempts(self, error): and then raise the exception """ with patch.object(RedisCluster, "_execute_command") as execute_command: + def raise_error(target_node, *args, **kwargs): execute_command.failed_calls += 1 raise error("mocked error") @@ -861,10 +865,10 @@ def test_cluster_retry_object(self, r) -> None: retry = Retry(ExponentialBackoff(10, 5), 5) rc_custom_retry = RedisCluster("127.0.0.1", 16379, retry=retry) assert ( - rc_custom_retry.get_node("127.0.0.1", 16379) - .redis_connection.get_retry() - ._retries - == retry._retries + rc_custom_retry.get_node("127.0.0.1", 16379) + .redis_connection.get_retry() + ._retries + == retry._retries ) def test_replace_cluster_node(self, r) -> None: @@ -944,8 +948,8 @@ class TestClusterRedisCommands: def test_case_insensitive_command_names(self, r): assert ( - r.cluster_response_callbacks["cluster slots"] - == r.cluster_response_callbacks["CLUSTER SLOTS"] + r.cluster_response_callbacks["cluster slots"] + == r.cluster_response_callbacks["CLUSTER SLOTS"] ) def test_get_and_set(self, r): @@ -1279,8 +1283,8 @@ def test_cluster_nodes(self, r): assert len(nodes) == 7 assert nodes.get("172.17.0.7:7006") is not None assert ( - nodes.get("172.17.0.7:7006").get("node_id") - == "c8253bae761cb1ecb2b61857d85dfe455a0fec8b" + nodes.get("172.17.0.7:7006").get("node_id") + == "c8253bae761cb1ecb2b61857d85dfe455a0fec8b" ) @skip_if_redis_enterprise() @@ -1403,8 +1407,8 @@ def test_cluster_replicas(self, r): assert replicas.get("127.0.0.1:6377") is not None assert replicas.get("127.0.0.1:6378") is not None assert ( - replicas.get("127.0.0.1:6378").get("node_id") - == "r4xfga22229cf3c652b6fca0d09ff69f3e0d4d" + replicas.get("127.0.0.1:6378").get("node_id") + == "r4xfga22229cf3c652b6fca0d09ff69f3e0d4d" ) @skip_if_server_version_lt("7.0.0") @@ -1473,8 +1477,8 @@ def _init_slowlog_test(self, r, node): def _teardown_slowlog_test(self, r, node, prev_limit): assert ( - r.config_set("slowlog-log-slower-than", prev_limit, target_nodes=node) - is True + r.config_set("slowlog-log-slower-than", prev_limit, target_nodes=node) + is True ) def test_slowlog_get(self, r, slowlog): @@ -1893,8 +1897,8 @@ def test_cluster_zinterstore_max(self, r): r.zadd("{foo}b", {"a1": 2, "a2": 2, "a3": 2}) r.zadd("{foo}c", {"a1": 6, "a3": 5, "a4": 4}) assert ( - r.zinterstore("{foo}d", ["{foo}a", "{foo}b", "{foo}c"], aggregate="MAX") - == 2 + r.zinterstore("{foo}d", ["{foo}a", "{foo}b", "{foo}c"], aggregate="MAX") + == 2 ) assert r.zrange("{foo}d", 0, -1, withscores=True) == [(b"a3", 5), (b"a1", 6)] @@ -1903,8 +1907,8 @@ def test_cluster_zinterstore_min(self, r): r.zadd("{foo}b", {"a1": 2, "a2": 3, "a3": 5}) r.zadd("{foo}c", {"a1": 6, "a3": 5, "a4": 4}) assert ( - r.zinterstore("{foo}d", ["{foo}a", "{foo}b", "{foo}c"], aggregate="MIN") - == 2 + r.zinterstore("{foo}d", ["{foo}a", "{foo}b", "{foo}c"], aggregate="MIN") + == 2 ) assert r.zrange("{foo}d", 0, -1, withscores=True) == [(b"a1", 1), (b"a3", 3)] @@ -2007,8 +2011,8 @@ def test_cluster_zunionstore_max(self, r): r.zadd("{foo}b", {"a1": 2, "a2": 2, "a3": 2}) r.zadd("{foo}c", {"a1": 6, "a3": 5, "a4": 4}) assert ( - r.zunionstore("{foo}d", ["{foo}a", "{foo}b", "{foo}c"], aggregate="MAX") - == 4 + r.zunionstore("{foo}d", ["{foo}a", "{foo}b", "{foo}c"], aggregate="MAX") + == 4 ) assert r.zrange("{foo}d", 0, -1, withscores=True) == [ (b"a2", 2), @@ -2022,8 +2026,8 @@ def test_cluster_zunionstore_min(self, r): r.zadd("{foo}b", {"a1": 2, "a2": 2, "a3": 4}) r.zadd("{foo}c", {"a1": 6, "a3": 5, "a4": 4}) assert ( - r.zunionstore("{foo}d", ["{foo}a", "{foo}b", "{foo}c"], aggregate="MIN") - == 4 + r.zunionstore("{foo}d", ["{foo}a", "{foo}b", "{foo}c"], aggregate="MIN") + == 4 ) assert r.zrange("{foo}d", 0, -1, withscores=True) == [ (b"a1", 1), @@ -2083,7 +2087,6 @@ def test_cluster_geosearchstore(self, r): r.geoadd("{foo}barcelona", values) r.geosearchstore( - "{foo}places_barcelona", "{foo}barcelona", longitude=2.191, @@ -2155,8 +2158,8 @@ def test_cluster_keys(self, r): for key in keys: r[key] = 1 assert ( - set(r.keys(pattern="test_*", target_nodes="primaries")) - == keys_with_underscores + set(r.keys(pattern="test_*", target_nodes="primaries")) + == keys_with_underscores ) assert set(r.keys(pattern="test*", target_nodes="primaries")) == keys @@ -2168,7 +2171,7 @@ def test_cluster_scan(self, r): r.set("c", 3) for target_nodes, nodes in zip( - ["primaries", "replicas"], [r.get_primaries(), r.get_replicas()] + ["primaries", "replicas"], [r.get_primaries(), r.get_replicas()] ): cursors, keys = r.scan(target_nodes=target_nodes) assert sorted(keys) == [b"a", b"b", b"c"] @@ -2189,7 +2192,7 @@ def test_cluster_scan_type(self, r): r.lpush("a-list", "aux", 3) for target_nodes, nodes in zip( - ["primaries", "replicas"], [r.get_primaries(), r.get_replicas()] + ["primaries", "replicas"], [r.get_primaries(), r.get_replicas()] ): cursors, keys = r.scan(_type="SET", target_nodes=target_nodes) assert sorted(keys) == [b"a-set", b"b-set", b"c-set"] @@ -2574,7 +2577,7 @@ def execute_command(*args, **kwargs): assert "Redis Cluster cannot be connected" in str(e.value) with patch.object( - CommandsParser, "initialize", autospec=True + CommandsParser, "initialize", autospec=True ) as cmd_parser_initialize: def cmd_init_mock(self, r): @@ -2765,8 +2768,8 @@ def test_blocked_arguments(self, r): r.pipeline(transaction=True) assert ( - str(ex.value).startswith("transaction is deprecated in cluster mode") - is True + str(ex.value).startswith("transaction is deprecated in cluster mode") + is True ) with pytest.raises(RedisClusterException) as ex: