diff --git a/ch_backup/clickhouse/disks.py b/ch_backup/clickhouse/disks.py index d761f3bd..63af7f0f 100644 --- a/ch_backup/clickhouse/disks.py +++ b/ch_backup/clickhouse/disks.py @@ -30,6 +30,7 @@ class ClickHouseDisksException(RuntimeError): CH_DISK_CONFIG_PATH = "/tmp/clickhouse-disks-config.xml" +CH_OBJECT_STORAGE_REQUEST_TIMEOUT_MS = 1 * 60 * 60 * 1000 class ClickHouseTemporaryDisks: @@ -115,7 +116,7 @@ def _render_disks_config(self, path, disks): with open(path, "w", encoding="utf-8") as f: xmltodict.unparse( { - "yandex": { + "clickhouse": { "storage_configuration": {"disks": disks}, } }, @@ -142,9 +143,27 @@ def _create_temporary_disk( disk_config["endpoint"] = os.path.join( f"{endpoint.scheme}://{endpoint_netloc}", source_bucket, source_path, "" ) + disks_config = {tmp_disk_name: disk_config} + + request_timeout_ms = int(disk_config.get("request_timeout_ms", 0)) + if request_timeout_ms < CH_OBJECT_STORAGE_REQUEST_TIMEOUT_MS: + disks_config[tmp_disk_name]["request_timeout_ms"] = str( + CH_OBJECT_STORAGE_REQUEST_TIMEOUT_MS + ) + disks_config[disk_name] = { + "request_timeout_ms": { + "@replace": "replace", + "#text": str(CH_OBJECT_STORAGE_REQUEST_TIMEOUT_MS), + } + } + if self._disks: + self._disks[disk_name]["request_timeout_ms"] = str( + CH_OBJECT_STORAGE_REQUEST_TIMEOUT_MS + ) + self._render_disks_config( _get_config_path(self._config_dir, tmp_disk_name), - {tmp_disk_name: disk_config}, + disks_config, ) self._ch_ctl.reload_config() @@ -155,7 +174,7 @@ def _create_temporary_disk( ) self._created_disks[tmp_disk_name] = source_disk - self._disks[tmp_disk_name] = disk_config + self._disks[tmp_disk_name] = disks_config[tmp_disk_name] def copy_parts( self, diff --git a/ch_backup/storage/async_pipeline/stages/storage/rate_limiter_stage.py b/ch_backup/storage/async_pipeline/stages/storage/rate_limiter_stage.py index 8e56b9c3..b9b558e0 100644 --- a/ch_backup/storage/async_pipeline/stages/storage/rate_limiter_stage.py +++ b/ch_backup/storage/async_pipeline/stages/storage/rate_limiter_stage.py @@ -13,7 +13,7 @@ class RateLimiterStage(Handler): """ A bottleneck for controlling the number of data to prevent excessive loading. - Based on tocken bucket algorithm. + Based on token bucket algorithm. """ stype = StageType.STORAGE diff --git a/requirements.txt b/requirements.txt index ba26cb5c..42b10bbc 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -requests>2.19 +requests>2.19,<2.32 boto3<1.19 botocore<1.22 psutil diff --git a/tests/unit/test_disks.py b/tests/unit/test_disks.py new file mode 100644 index 00000000..bf822bfb --- /dev/null +++ b/tests/unit/test_disks.py @@ -0,0 +1,200 @@ +""" +Unit tests disks module. +""" + +import unittest + +import xmltodict +from tests.unit.utils import assert_equal, parametrize + +from ch_backup.backup_context import BackupContext +from ch_backup.clickhouse.config import ClickhouseConfig +from ch_backup.clickhouse.disks import ClickHouseTemporaryDisks +from ch_backup.config import DEFAULT_CONFIG, Config + +write_result = "" + + +@parametrize( + { + "id": "No timeout", + "args": { + "clickhouse_config": """ + + + + + s3 + https://s3.eu-central-1.amazonaws.com/double-cloud-storage-chc0001/cloud_storage/chc0001/s1/ + AKIAACCESSKEY + SecretAccesskey + + + + + """, + "disk_name": "object_storage", + "source": { + "endpoint": "s3.us-west-1.amazonaws.com", + "bucket": "double-cloud-storage-chc0002", + "path": "cloud_storage/chc0002/s2/", + }, + "temp_config": """ + + + + + s3 + https://s3.us-west-1.amazonaws.com/double-cloud-storage-chc0002/cloud_storage/chc0002/s2/ + AKIAACCESSKEY + SecretAccesskey + 3600000 + + + 3600000 + + + + + """, + }, + }, + { + "id": "Small timeout", + "args": { + "clickhouse_config": """ + + + + + s3 + https://s3.eu-central-1.amazonaws.com/double-cloud-storage-chc0001/cloud_storage/chc0001/s1/ + AKIAACCESSKEY + SecretAccesskey + 30000 + + + + + """, + "disk_name": "object_storage", + "source": { + "endpoint": "s3.us-west-1.amazonaws.com", + "bucket": "double-cloud-storage-chc0002", + "path": "cloud_storage/chc0002/s2/", + }, + "temp_config": """ + + + + + s3 + https://s3.us-west-1.amazonaws.com/double-cloud-storage-chc0002/cloud_storage/chc0002/s2/ + AKIAACCESSKEY + SecretAccesskey + 3600000 + + + 3600000 + + + + + """, + }, + }, + { + "id": "Large timeout", + "args": { + "clickhouse_config": """ + + + + + s3 + https://s3.eu-central-1.amazonaws.com/double-cloud-storage-chc0001/cloud_storage/chc0001/s1/ + AKIAACCESSKEY + SecretAccesskey + 7200000 + + + + + """, + "disk_name": "object_storage", + "source": { + "endpoint": "s3.us-west-1.amazonaws.com", + "bucket": "double-cloud-storage-chc0002", + "path": "cloud_storage/chc0002/s2/", + }, + "temp_config": """ + + + + + s3 + https://s3.us-west-1.amazonaws.com/double-cloud-storage-chc0002/cloud_storage/chc0002/s2/ + AKIAACCESSKEY + SecretAccesskey + 7200000 + + + + + """, + }, + }, +) +def test_temporary_disk(clickhouse_config, disk_name, source, temp_config): + context = BackupContext(DEFAULT_CONFIG) # type: ignore[arg-type] + context.ch_ctl = unittest.mock.MagicMock() + context.backup_layout = unittest.mock.MagicMock() + context.backup_meta = unittest.mock.MagicMock() + with unittest.mock.patch( + "builtins.open", + new=unittest.mock.mock_open(read_data=clickhouse_config), + create=True, + ): + with unittest.mock.patch("yaml.load", return_value=""): + context.ch_config = ClickhouseConfig(Config("foo")) + context.ch_config.load() + with unittest.mock.patch("builtins.open", new=unittest.mock.mock_open()) as m: + disk = ClickHouseTemporaryDisks( + context.ch_ctl, + context.backup_layout, + context.config_root, + context.backup_meta, + source["bucket"], + source["path"], + source["endpoint"], + context.ch_config, + ) + + # pylint: disable=global-statement + global write_result + write_result = "" + m().write = write_collector + + # pylint: disable=protected-access + disk._create_temporary_disk( + context.backup_meta, + disk_name, + source["bucket"], + source["path"], + source["endpoint"], + ) + m.assert_called_with( + f"/etc/clickhouse-server/config.d/cloud_storage_tmp_disk_{disk_name}_source.xml", + "w", + encoding="utf-8", + ) + + expected_content = xmltodict.parse(temp_config) + actual_content = xmltodict.parse(write_result) + assert_equal(actual_content, expected_content) + + +def write_collector(x): + # pylint: disable=global-statement + global write_result + write_result += x.decode("utf-8")