From b43ddb84e1970bae9e7b6ae19c73a490b73997c1 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Fri, 24 May 2024 14:56:57 +0200 Subject: [PATCH 1/5] CHINA-280: Set 1 hour request timeout for S3 during restore --- ch_backup/clickhouse/disks.py | 19 +- .../stages/storage/rate_limiter_stage.py | 2 +- tests/unit/test_disks.py | 200 ++++++++++++++++++ 3 files changed, 218 insertions(+), 3 deletions(-) create mode 100644 tests/unit/test_disks.py diff --git a/ch_backup/clickhouse/disks.py b/ch_backup/clickhouse/disks.py index d761f3bd..6dcae4d7 100644 --- a/ch_backup/clickhouse/disks.py +++ b/ch_backup/clickhouse/disks.py @@ -30,6 +30,7 @@ class ClickHouseDisksException(RuntimeError): CH_DISK_CONFIG_PATH = "/tmp/clickhouse-disks-config.xml" +CH_OBJECT_STORAGE_REQUEST_TIMEOUT_MS = 3600000 class ClickHouseTemporaryDisks: @@ -115,7 +116,7 @@ def _render_disks_config(self, path, disks): with open(path, "w", encoding="utf-8") as f: xmltodict.unparse( { - "yandex": { + "clickhouse": { "storage_configuration": {"disks": disks}, } }, @@ -142,9 +143,23 @@ def _create_temporary_disk( disk_config["endpoint"] = os.path.join( f"{endpoint.scheme}://{endpoint_netloc}", source_bucket, source_path, "" ) + disks_config = {tmp_disk_name: disk_config} + + request_timeout_ms = int(disk_config.get("request_timeout_ms", 0)) + if request_timeout_ms < CH_OBJECT_STORAGE_REQUEST_TIMEOUT_MS: + disks_config[tmp_disk_name]["request_timeout_ms"] = str( + CH_OBJECT_STORAGE_REQUEST_TIMEOUT_MS + ) + disks_config[disk_name] = { + "request_timeout_ms": { + "@replace": "replace", + "#text": str(CH_OBJECT_STORAGE_REQUEST_TIMEOUT_MS), + } + } + self._render_disks_config( _get_config_path(self._config_dir, tmp_disk_name), - {tmp_disk_name: disk_config}, + disks_config, ) self._ch_ctl.reload_config() diff --git a/ch_backup/storage/async_pipeline/stages/storage/rate_limiter_stage.py b/ch_backup/storage/async_pipeline/stages/storage/rate_limiter_stage.py index 8e56b9c3..b9b558e0 100644 --- a/ch_backup/storage/async_pipeline/stages/storage/rate_limiter_stage.py +++ b/ch_backup/storage/async_pipeline/stages/storage/rate_limiter_stage.py @@ -13,7 +13,7 @@ class RateLimiterStage(Handler): """ A bottleneck for controlling the number of data to prevent excessive loading. - Based on tocken bucket algorithm. + Based on token bucket algorithm. """ stype = StageType.STORAGE diff --git a/tests/unit/test_disks.py b/tests/unit/test_disks.py new file mode 100644 index 00000000..bf822bfb --- /dev/null +++ b/tests/unit/test_disks.py @@ -0,0 +1,200 @@ +""" +Unit tests disks module. +""" + +import unittest + +import xmltodict +from tests.unit.utils import assert_equal, parametrize + +from ch_backup.backup_context import BackupContext +from ch_backup.clickhouse.config import ClickhouseConfig +from ch_backup.clickhouse.disks import ClickHouseTemporaryDisks +from ch_backup.config import DEFAULT_CONFIG, Config + +write_result = "" + + +@parametrize( + { + "id": "No timeout", + "args": { + "clickhouse_config": """ + + + + + s3 + https://s3.eu-central-1.amazonaws.com/double-cloud-storage-chc0001/cloud_storage/chc0001/s1/ + AKIAACCESSKEY + SecretAccesskey + + + + + """, + "disk_name": "object_storage", + "source": { + "endpoint": "s3.us-west-1.amazonaws.com", + "bucket": "double-cloud-storage-chc0002", + "path": "cloud_storage/chc0002/s2/", + }, + "temp_config": """ + + + + + s3 + https://s3.us-west-1.amazonaws.com/double-cloud-storage-chc0002/cloud_storage/chc0002/s2/ + AKIAACCESSKEY + SecretAccesskey + 3600000 + + + 3600000 + + + + + """, + }, + }, + { + "id": "Small timeout", + "args": { + "clickhouse_config": """ + + + + + s3 + https://s3.eu-central-1.amazonaws.com/double-cloud-storage-chc0001/cloud_storage/chc0001/s1/ + AKIAACCESSKEY + SecretAccesskey + 30000 + + + + + """, + "disk_name": "object_storage", + "source": { + "endpoint": "s3.us-west-1.amazonaws.com", + "bucket": "double-cloud-storage-chc0002", + "path": "cloud_storage/chc0002/s2/", + }, + "temp_config": """ + + + + + s3 + https://s3.us-west-1.amazonaws.com/double-cloud-storage-chc0002/cloud_storage/chc0002/s2/ + AKIAACCESSKEY + SecretAccesskey + 3600000 + + + 3600000 + + + + + """, + }, + }, + { + "id": "Large timeout", + "args": { + "clickhouse_config": """ + + + + + s3 + https://s3.eu-central-1.amazonaws.com/double-cloud-storage-chc0001/cloud_storage/chc0001/s1/ + AKIAACCESSKEY + SecretAccesskey + 7200000 + + + + + """, + "disk_name": "object_storage", + "source": { + "endpoint": "s3.us-west-1.amazonaws.com", + "bucket": "double-cloud-storage-chc0002", + "path": "cloud_storage/chc0002/s2/", + }, + "temp_config": """ + + + + + s3 + https://s3.us-west-1.amazonaws.com/double-cloud-storage-chc0002/cloud_storage/chc0002/s2/ + AKIAACCESSKEY + SecretAccesskey + 7200000 + + + + + """, + }, + }, +) +def test_temporary_disk(clickhouse_config, disk_name, source, temp_config): + context = BackupContext(DEFAULT_CONFIG) # type: ignore[arg-type] + context.ch_ctl = unittest.mock.MagicMock() + context.backup_layout = unittest.mock.MagicMock() + context.backup_meta = unittest.mock.MagicMock() + with unittest.mock.patch( + "builtins.open", + new=unittest.mock.mock_open(read_data=clickhouse_config), + create=True, + ): + with unittest.mock.patch("yaml.load", return_value=""): + context.ch_config = ClickhouseConfig(Config("foo")) + context.ch_config.load() + with unittest.mock.patch("builtins.open", new=unittest.mock.mock_open()) as m: + disk = ClickHouseTemporaryDisks( + context.ch_ctl, + context.backup_layout, + context.config_root, + context.backup_meta, + source["bucket"], + source["path"], + source["endpoint"], + context.ch_config, + ) + + # pylint: disable=global-statement + global write_result + write_result = "" + m().write = write_collector + + # pylint: disable=protected-access + disk._create_temporary_disk( + context.backup_meta, + disk_name, + source["bucket"], + source["path"], + source["endpoint"], + ) + m.assert_called_with( + f"/etc/clickhouse-server/config.d/cloud_storage_tmp_disk_{disk_name}_source.xml", + "w", + encoding="utf-8", + ) + + expected_content = xmltodict.parse(temp_config) + actual_content = xmltodict.parse(write_result) + assert_equal(actual_content, expected_content) + + +def write_collector(x): + # pylint: disable=global-statement + global write_result + write_result += x.decode("utf-8") From 92cb452f1f55ba85be36d27db5f8d52286c85206 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Fri, 24 May 2024 16:15:35 +0200 Subject: [PATCH 2/5] Fix build --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index ba26cb5c..fdde5143 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -requests>2.19 +requests<2.32.0 boto3<1.19 botocore<1.22 psutil From b86afee68c5fefdb950b1c08d5a5cfedaea37e75 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin <56930273+ianton-ru@users.noreply.github.com> Date: Mon, 27 May 2024 11:22:21 +0300 Subject: [PATCH 3/5] Update ch_backup/clickhouse/disks.py Co-authored-by: MikhailBurdukov <102754618+MikhailBurdukov@users.noreply.github.com> --- ch_backup/clickhouse/disks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ch_backup/clickhouse/disks.py b/ch_backup/clickhouse/disks.py index 6dcae4d7..a4440d66 100644 --- a/ch_backup/clickhouse/disks.py +++ b/ch_backup/clickhouse/disks.py @@ -30,7 +30,7 @@ class ClickHouseDisksException(RuntimeError): CH_DISK_CONFIG_PATH = "/tmp/clickhouse-disks-config.xml" -CH_OBJECT_STORAGE_REQUEST_TIMEOUT_MS = 3600000 +CH_OBJECT_STORAGE_REQUEST_TIMEOUT_MS = 1 * 60 * 60 * 1000 class ClickHouseTemporaryDisks: From ba078cd1a82918c35e1e178e81fe38b83d19c563 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Mon, 27 May 2024 15:14:45 +0200 Subject: [PATCH 4/5] Add timeout to /tmp/clickhouse-disks-config.xml too --- ch_backup/clickhouse/disks.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/ch_backup/clickhouse/disks.py b/ch_backup/clickhouse/disks.py index a4440d66..63af7f0f 100644 --- a/ch_backup/clickhouse/disks.py +++ b/ch_backup/clickhouse/disks.py @@ -156,6 +156,10 @@ def _create_temporary_disk( "#text": str(CH_OBJECT_STORAGE_REQUEST_TIMEOUT_MS), } } + if self._disks: + self._disks[disk_name]["request_timeout_ms"] = str( + CH_OBJECT_STORAGE_REQUEST_TIMEOUT_MS + ) self._render_disks_config( _get_config_path(self._config_dir, tmp_disk_name), @@ -170,7 +174,7 @@ def _create_temporary_disk( ) self._created_disks[tmp_disk_name] = source_disk - self._disks[tmp_disk_name] = disk_config + self._disks[tmp_disk_name] = disks_config[tmp_disk_name] def copy_parts( self, From e17cd4e6c0139c3a8d30fc337c7a2e4a234a6406 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Wed, 29 May 2024 16:48:05 +0200 Subject: [PATCH 5/5] Fix requirements.txt --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index fdde5143..42b10bbc 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -requests<2.32.0 +requests>2.19,<2.32 boto3<1.19 botocore<1.22 psutil