From eaa382f5f9f519ec394354d2ed83f0785d8570da Mon Sep 17 00:00:00 2001 From: Aleksandr Bykov Date: Fri, 20 Dec 2024 19:25:06 +0700 Subject: [PATCH] fix(scan_operations): add retry policy to cql query The node where scan operations was started could be used by disruptive nemesis. If node was restarted/stopped while scan query had been running, the scan operation would be terminated and error event and message will mark test as failed. Add to cql session ExponetionalBackoffRetryPolicy which allow to retry the query, if node was down and once it back, query will be succesfully finished Fixes: #9284 --- sdcm/scan_operation_thread.py | 29 +++++++++++++++++++++-------- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/sdcm/scan_operation_thread.py b/sdcm/scan_operation_thread.py index d33a0fbcc6..78c9838399 100644 --- a/sdcm/scan_operation_thread.py +++ b/sdcm/scan_operation_thread.py @@ -11,11 +11,13 @@ from abc import abstractmethod from string import Template from typing import Optional, Type, NamedTuple, TYPE_CHECKING +from contextlib import contextmanager from pytz import utc from cassandra import ConsistencyLevel, OperationTimedOut, ReadTimeout from cassandra.cluster import ResponseFuture, ResultSet # pylint: disable=no-name-in-module from cassandra.query import SimpleStatement # pylint: disable=no-name-in-module +from cassandra.policies import ExponentialBackoffRetryPolicy from sdcm.remote import LocalCmdRunner from sdcm.sct_events import Severity @@ -106,6 +108,10 @@ def __init__(self, generator: random.Random, thread_params: ThreadParams, thread self.db_node = self._get_random_node() self.current_operation_stat = None self.log.info("FullscanOperationBase init finished") + self._exp_backoff_retry_policy_params = { + "max_num_retries": 15.0, "min_interval": 1.0, "max_interval": 1800.0 + } + self._request_default_timeout = 1800 def _get_random_node(self) -> BaseNode: return self.generator.choice(self.fullscan_params.db_cluster.data_nodes) @@ -141,11 +147,7 @@ def run_scan_event(self, cmd: str, cmd=cmd ) - with self.fullscan_params.db_cluster.cql_connection_patient( - node=self.db_node, - connect_timeout=300, - user=self.fullscan_params.user, - password=self.fullscan_params.user_password) as session: + with self.cql_connection(connect_timeout=300) as session: try: scan_op_event.message = '' start_time = time.time() @@ -191,6 +193,18 @@ def fetch_result_pages(self, result, read_pages): if read_pages > 0: pages += 1 + @contextmanager + def cql_connection(self, **kwargs): + node = kwargs.pop("node", self.db_node) + with self.fullscan_params.db_cluster.cql_connection_patient( + node=node, + user=self.fullscan_params.user, + password=self.fullscan_params.user_password, **kwargs) as session: + session.cluster.default_retry_policy = ExponentialBackoffRetryPolicy( + **self._exp_backoff_retry_policy_params) + session.default_timeout = self._request_default_timeout + yield session + class FullScanOperation(FullscanOperationBase): def __init__(self, generator, **kwargs): @@ -239,7 +253,7 @@ def __init__(self, generator, **kwargs): def get_table_clustering_order(self) -> str: node = self._get_random_node() try: - with self.fullscan_params.db_cluster.cql_connection_patient(node=node, connect_timeout=300) as session: + with self.cql_connection(node=node, connect_timeout=300) as session: # Using CL ONE. No need for a quorum since querying a constant fixed attribute of a table. session.default_consistency_level = ConsistencyLevel.ONE return get_table_clustering_order(ks_cf=self.fullscan_params.ks_cf, @@ -264,8 +278,7 @@ def randomly_form_cql_statement(self) -> Optional[tuple[str, str]]: # pylint: d """ db_node = self._get_random_node() - with self.fullscan_params.db_cluster.cql_connection_patient( - node=db_node, connect_timeout=300) as session: + with self.cql_connection(node=db_node, connect_timeout=300) as session: ck_random_min_value = self.generator.randint(a=1, b=self.fullscan_params.rows_count) ck_random_max_value = self.generator.randint(a=ck_random_min_value, b=self.fullscan_params.rows_count) self.ck_filter = ck_filter = self.generator.choice(list(self.reversed_query_filter_ck_by.keys()))