Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CHINA-280: Set 1 hour request timeout for S3 during restore #154

Merged
merged 5 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions ch_backup/clickhouse/disks.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class ClickHouseDisksException(RuntimeError):


CH_DISK_CONFIG_PATH = "/tmp/clickhouse-disks-config.xml"
CH_OBJECT_STORAGE_REQUEST_TIMEOUT_MS = 1 * 60 * 60 * 1000


class ClickHouseTemporaryDisks:
Expand Down Expand Up @@ -115,7 +116,7 @@ def _render_disks_config(self, path, disks):
with open(path, "w", encoding="utf-8") as f:
xmltodict.unparse(
{
"yandex": {
"clickhouse": {
"storage_configuration": {"disks": disks},
}
},
Expand All @@ -142,9 +143,27 @@ def _create_temporary_disk(
disk_config["endpoint"] = os.path.join(
f"{endpoint.scheme}://{endpoint_netloc}", source_bucket, source_path, ""
)
disks_config = {tmp_disk_name: disk_config}

request_timeout_ms = int(disk_config.get("request_timeout_ms", 0))
if request_timeout_ms < CH_OBJECT_STORAGE_REQUEST_TIMEOUT_MS:
disks_config[tmp_disk_name]["request_timeout_ms"] = str(
CH_OBJECT_STORAGE_REQUEST_TIMEOUT_MS
)
disks_config[disk_name] = {
"request_timeout_ms": {
"@replace": "replace",
"#text": str(CH_OBJECT_STORAGE_REQUEST_TIMEOUT_MS),
}
Comment on lines +153 to +157
Copy link
Contributor

@MikhailBurdukov MikhailBurdukov May 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the motivation of replacing request_timeout_ms for clickhouse-server config? AFAIK there is only one long-running operation(CopyObject) and is it done using with clickhouse-disk. Maybe make sense to keep request_timeout_ms only for clickhouse-disks config?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For consistency.
If source disk is not used in clickhouse-server, better remove additional config for server at all.

}
if self._disks:
self._disks[disk_name]["request_timeout_ms"] = str(
CH_OBJECT_STORAGE_REQUEST_TIMEOUT_MS
)

self._render_disks_config(
_get_config_path(self._config_dir, tmp_disk_name),
{tmp_disk_name: disk_config},
disks_config,
)

self._ch_ctl.reload_config()
Expand All @@ -155,7 +174,7 @@ def _create_temporary_disk(
)

self._created_disks[tmp_disk_name] = source_disk
self._disks[tmp_disk_name] = disk_config
self._disks[tmp_disk_name] = disks_config[tmp_disk_name]

def copy_parts(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
class RateLimiterStage(Handler):
"""
A bottleneck for controlling the number of data to prevent excessive loading.
Based on tocken bucket algorithm.
Based on token bucket algorithm.
"""

stype = StageType.STORAGE
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
requests>2.19
requests>2.19,<2.32
boto3<1.19
botocore<1.22
psutil
Expand Down
200 changes: 200 additions & 0 deletions tests/unit/test_disks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
"""
Unit tests disks module.
"""

import unittest

import xmltodict
from tests.unit.utils import assert_equal, parametrize

from ch_backup.backup_context import BackupContext
from ch_backup.clickhouse.config import ClickhouseConfig
from ch_backup.clickhouse.disks import ClickHouseTemporaryDisks
from ch_backup.config import DEFAULT_CONFIG, Config

write_result = ""


@parametrize(
{
"id": "No timeout",
"args": {
"clickhouse_config": """
<clickhouse>
<storage_configuration>
<disks>
<object_storage>
<type>s3</type>
<endpoint>https://s3.eu-central-1.amazonaws.com/double-cloud-storage-chc0001/cloud_storage/chc0001/s1/</endpoint>
<access_key_id>AKIAACCESSKEY</access_key_id>
<secret_access_key>SecretAccesskey</secret_access_key>
</object_storage>
</disks>
</storage_configuration>
</clickhouse>
""",
"disk_name": "object_storage",
"source": {
"endpoint": "s3.us-west-1.amazonaws.com",
"bucket": "double-cloud-storage-chc0002",
"path": "cloud_storage/chc0002/s2/",
},
"temp_config": """
<clickhouse>
<storage_configuration>
<disks>
<object_storage_source>
<type>s3</type>
<endpoint>https://s3.us-west-1.amazonaws.com/double-cloud-storage-chc0002/cloud_storage/chc0002/s2/</endpoint>
<access_key_id>AKIAACCESSKEY</access_key_id>
<secret_access_key>SecretAccesskey</secret_access_key>
<request_timeout_ms>3600000</request_timeout_ms>
</object_storage_source>
<object_storage>
<request_timeout_ms replace="replace">3600000</request_timeout_ms>
</object_storage>
</disks>
</storage_configuration>
</clickhouse>
""",
},
},
{
"id": "Small timeout",
"args": {
"clickhouse_config": """
<clickhouse>
<storage_configuration>
<disks>
<object_storage>
<type>s3</type>
<endpoint>https://s3.eu-central-1.amazonaws.com/double-cloud-storage-chc0001/cloud_storage/chc0001/s1/</endpoint>
<access_key_id>AKIAACCESSKEY</access_key_id>
<secret_access_key>SecretAccesskey</secret_access_key>
<request_timeout_ms>30000</request_timeout_ms>
</object_storage>
</disks>
</storage_configuration>
</clickhouse>
""",
"disk_name": "object_storage",
"source": {
"endpoint": "s3.us-west-1.amazonaws.com",
"bucket": "double-cloud-storage-chc0002",
"path": "cloud_storage/chc0002/s2/",
},
"temp_config": """
<clickhouse>
<storage_configuration>
<disks>
<object_storage_source>
<type>s3</type>
<endpoint>https://s3.us-west-1.amazonaws.com/double-cloud-storage-chc0002/cloud_storage/chc0002/s2/</endpoint>
<access_key_id>AKIAACCESSKEY</access_key_id>
<secret_access_key>SecretAccesskey</secret_access_key>
<request_timeout_ms>3600000</request_timeout_ms>
</object_storage_source>
<object_storage>
<request_timeout_ms replace="replace">3600000</request_timeout_ms>
</object_storage>
</disks>
</storage_configuration>
</clickhouse>
""",
},
},
{
"id": "Large timeout",
"args": {
"clickhouse_config": """
<clickhouse>
<storage_configuration>
<disks>
<object_storage>
<type>s3</type>
<endpoint>https://s3.eu-central-1.amazonaws.com/double-cloud-storage-chc0001/cloud_storage/chc0001/s1/</endpoint>
<access_key_id>AKIAACCESSKEY</access_key_id>
<secret_access_key>SecretAccesskey</secret_access_key>
<request_timeout_ms>7200000</request_timeout_ms>
</object_storage>
</disks>
</storage_configuration>
</clickhouse>
""",
"disk_name": "object_storage",
"source": {
"endpoint": "s3.us-west-1.amazonaws.com",
"bucket": "double-cloud-storage-chc0002",
"path": "cloud_storage/chc0002/s2/",
},
"temp_config": """
<clickhouse>
<storage_configuration>
<disks>
<object_storage_source>
<type>s3</type>
<endpoint>https://s3.us-west-1.amazonaws.com/double-cloud-storage-chc0002/cloud_storage/chc0002/s2/</endpoint>
<access_key_id>AKIAACCESSKEY</access_key_id>
<secret_access_key>SecretAccesskey</secret_access_key>
<request_timeout_ms>7200000</request_timeout_ms>
</object_storage_source>
</disks>
</storage_configuration>
</clickhouse>
""",
},
},
)
def test_temporary_disk(clickhouse_config, disk_name, source, temp_config):
context = BackupContext(DEFAULT_CONFIG) # type: ignore[arg-type]
context.ch_ctl = unittest.mock.MagicMock()
context.backup_layout = unittest.mock.MagicMock()
context.backup_meta = unittest.mock.MagicMock()
with unittest.mock.patch(
"builtins.open",
new=unittest.mock.mock_open(read_data=clickhouse_config),
create=True,
):
with unittest.mock.patch("yaml.load", return_value=""):
context.ch_config = ClickhouseConfig(Config("foo"))
context.ch_config.load()
with unittest.mock.patch("builtins.open", new=unittest.mock.mock_open()) as m:
disk = ClickHouseTemporaryDisks(
context.ch_ctl,
context.backup_layout,
context.config_root,
context.backup_meta,
source["bucket"],
source["path"],
source["endpoint"],
context.ch_config,
)

# pylint: disable=global-statement
global write_result
write_result = ""
m().write = write_collector

# pylint: disable=protected-access
disk._create_temporary_disk(
context.backup_meta,
disk_name,
source["bucket"],
source["path"],
source["endpoint"],
)
m.assert_called_with(
f"/etc/clickhouse-server/config.d/cloud_storage_tmp_disk_{disk_name}_source.xml",
"w",
encoding="utf-8",
)

expected_content = xmltodict.parse(temp_config)
actual_content = xmltodict.parse(write_result)
assert_equal(actual_content, expected_content)


def write_collector(x):
# pylint: disable=global-statement
global write_result
write_result += x.decode("utf-8")
Loading