From 4f20a8d19864beb7393136ad048cc7b34287bf6f Mon Sep 17 00:00:00 2001
From: Anton Ivashkin <56930273+ianton-ru@users.noreply.github.com>
Date: Wed, 29 May 2024 18:30:40 +0300
Subject: [PATCH] CHINA-280: Set 1 hour request timeout for S3 during restore
(#154)
* CHINA-280: Set 1 hour request timeout for S3 during restore
* Fix build
* Update ch_backup/clickhouse/disks.py
Co-authored-by: MikhailBurdukov <102754618+MikhailBurdukov@users.noreply.github.com>
* Add timeout to /tmp/clickhouse-disks-config.xml too
* Fix requirements.txt
---------
Co-authored-by: MikhailBurdukov <102754618+MikhailBurdukov@users.noreply.github.com>
---
ch_backup/clickhouse/disks.py | 25 ++++-
tests/unit/test_disks.py | 200 ++++++++++++++++++++++++++++++++++
2 files changed, 222 insertions(+), 3 deletions(-)
create mode 100644 tests/unit/test_disks.py
diff --git a/ch_backup/clickhouse/disks.py b/ch_backup/clickhouse/disks.py
index d761f3bd..63af7f0f 100644
--- a/ch_backup/clickhouse/disks.py
+++ b/ch_backup/clickhouse/disks.py
@@ -30,6 +30,7 @@ class ClickHouseDisksException(RuntimeError):
CH_DISK_CONFIG_PATH = "/tmp/clickhouse-disks-config.xml"
+CH_OBJECT_STORAGE_REQUEST_TIMEOUT_MS = 1 * 60 * 60 * 1000
class ClickHouseTemporaryDisks:
@@ -115,7 +116,7 @@ def _render_disks_config(self, path, disks):
with open(path, "w", encoding="utf-8") as f:
xmltodict.unparse(
{
- "yandex": {
+ "clickhouse": {
"storage_configuration": {"disks": disks},
}
},
@@ -142,9 +143,27 @@ def _create_temporary_disk(
disk_config["endpoint"] = os.path.join(
f"{endpoint.scheme}://{endpoint_netloc}", source_bucket, source_path, ""
)
+ disks_config = {tmp_disk_name: disk_config}
+
+ request_timeout_ms = int(disk_config.get("request_timeout_ms", 0))
+ if request_timeout_ms < CH_OBJECT_STORAGE_REQUEST_TIMEOUT_MS:
+ disks_config[tmp_disk_name]["request_timeout_ms"] = str(
+ CH_OBJECT_STORAGE_REQUEST_TIMEOUT_MS
+ )
+ disks_config[disk_name] = {
+ "request_timeout_ms": {
+ "@replace": "replace",
+ "#text": str(CH_OBJECT_STORAGE_REQUEST_TIMEOUT_MS),
+ }
+ }
+ if self._disks:
+ self._disks[disk_name]["request_timeout_ms"] = str(
+ CH_OBJECT_STORAGE_REQUEST_TIMEOUT_MS
+ )
+
self._render_disks_config(
_get_config_path(self._config_dir, tmp_disk_name),
- {tmp_disk_name: disk_config},
+ disks_config,
)
self._ch_ctl.reload_config()
@@ -155,7 +174,7 @@ def _create_temporary_disk(
)
self._created_disks[tmp_disk_name] = source_disk
- self._disks[tmp_disk_name] = disk_config
+ self._disks[tmp_disk_name] = disks_config[tmp_disk_name]
def copy_parts(
self,
diff --git a/tests/unit/test_disks.py b/tests/unit/test_disks.py
new file mode 100644
index 00000000..bf822bfb
--- /dev/null
+++ b/tests/unit/test_disks.py
@@ -0,0 +1,200 @@
+"""
+Unit tests disks module.
+"""
+
+import unittest
+
+import xmltodict
+from tests.unit.utils import assert_equal, parametrize
+
+from ch_backup.backup_context import BackupContext
+from ch_backup.clickhouse.config import ClickhouseConfig
+from ch_backup.clickhouse.disks import ClickHouseTemporaryDisks
+from ch_backup.config import DEFAULT_CONFIG, Config
+
+write_result = ""
+
+
+@parametrize(
+ {
+ "id": "No timeout",
+ "args": {
+ "clickhouse_config": """
+
+
+
+
+ s3
+ https://s3.eu-central-1.amazonaws.com/double-cloud-storage-chc0001/cloud_storage/chc0001/s1/
+ AKIAACCESSKEY
+ SecretAccesskey
+
+
+
+
+ """,
+ "disk_name": "object_storage",
+ "source": {
+ "endpoint": "s3.us-west-1.amazonaws.com",
+ "bucket": "double-cloud-storage-chc0002",
+ "path": "cloud_storage/chc0002/s2/",
+ },
+ "temp_config": """
+
+
+
+
+ s3
+ https://s3.us-west-1.amazonaws.com/double-cloud-storage-chc0002/cloud_storage/chc0002/s2/
+ AKIAACCESSKEY
+ SecretAccesskey
+ 3600000
+
+
+ 3600000
+
+
+
+
+ """,
+ },
+ },
+ {
+ "id": "Small timeout",
+ "args": {
+ "clickhouse_config": """
+
+
+
+
+ s3
+ https://s3.eu-central-1.amazonaws.com/double-cloud-storage-chc0001/cloud_storage/chc0001/s1/
+ AKIAACCESSKEY
+ SecretAccesskey
+ 30000
+
+
+
+
+ """,
+ "disk_name": "object_storage",
+ "source": {
+ "endpoint": "s3.us-west-1.amazonaws.com",
+ "bucket": "double-cloud-storage-chc0002",
+ "path": "cloud_storage/chc0002/s2/",
+ },
+ "temp_config": """
+
+
+
+
+ s3
+ https://s3.us-west-1.amazonaws.com/double-cloud-storage-chc0002/cloud_storage/chc0002/s2/
+ AKIAACCESSKEY
+ SecretAccesskey
+ 3600000
+
+
+ 3600000
+
+
+
+
+ """,
+ },
+ },
+ {
+ "id": "Large timeout",
+ "args": {
+ "clickhouse_config": """
+
+
+
+
+ s3
+ https://s3.eu-central-1.amazonaws.com/double-cloud-storage-chc0001/cloud_storage/chc0001/s1/
+ AKIAACCESSKEY
+ SecretAccesskey
+ 7200000
+
+
+
+
+ """,
+ "disk_name": "object_storage",
+ "source": {
+ "endpoint": "s3.us-west-1.amazonaws.com",
+ "bucket": "double-cloud-storage-chc0002",
+ "path": "cloud_storage/chc0002/s2/",
+ },
+ "temp_config": """
+
+
+
+
+ s3
+ https://s3.us-west-1.amazonaws.com/double-cloud-storage-chc0002/cloud_storage/chc0002/s2/
+ AKIAACCESSKEY
+ SecretAccesskey
+ 7200000
+
+
+
+
+ """,
+ },
+ },
+)
+def test_temporary_disk(clickhouse_config, disk_name, source, temp_config):
+ context = BackupContext(DEFAULT_CONFIG) # type: ignore[arg-type]
+ context.ch_ctl = unittest.mock.MagicMock()
+ context.backup_layout = unittest.mock.MagicMock()
+ context.backup_meta = unittest.mock.MagicMock()
+ with unittest.mock.patch(
+ "builtins.open",
+ new=unittest.mock.mock_open(read_data=clickhouse_config),
+ create=True,
+ ):
+ with unittest.mock.patch("yaml.load", return_value=""):
+ context.ch_config = ClickhouseConfig(Config("foo"))
+ context.ch_config.load()
+ with unittest.mock.patch("builtins.open", new=unittest.mock.mock_open()) as m:
+ disk = ClickHouseTemporaryDisks(
+ context.ch_ctl,
+ context.backup_layout,
+ context.config_root,
+ context.backup_meta,
+ source["bucket"],
+ source["path"],
+ source["endpoint"],
+ context.ch_config,
+ )
+
+ # pylint: disable=global-statement
+ global write_result
+ write_result = ""
+ m().write = write_collector
+
+ # pylint: disable=protected-access
+ disk._create_temporary_disk(
+ context.backup_meta,
+ disk_name,
+ source["bucket"],
+ source["path"],
+ source["endpoint"],
+ )
+ m.assert_called_with(
+ f"/etc/clickhouse-server/config.d/cloud_storage_tmp_disk_{disk_name}_source.xml",
+ "w",
+ encoding="utf-8",
+ )
+
+ expected_content = xmltodict.parse(temp_config)
+ actual_content = xmltodict.parse(write_result)
+ assert_equal(actual_content, expected_content)
+
+
+def write_collector(x):
+ # pylint: disable=global-statement
+ global write_result
+ write_result += x.decode("utf-8")