Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel FREEZE TABLE #164

Merged
merged 24 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
4c3acbf
Freeze and backup thread pools
kirillgarbar Jun 19, 2024
faecbed
Remove freezed data for one table
kirillgarbar Jun 20, 2024
8807c86
Freeze workers option in config
kirillgarbar Jun 20, 2024
1e86b09
More flexibility of data generation for tests
kirillgarbar Jun 22, 2024
6b7182b
Parallel freeze test
kirillgarbar Jun 22, 2024
c45b386
Move executors to subclass
kirillgarbar Jun 22, 2024
ec20451
Partition by prefix to fix dedup tests
kirillgarbar Jun 22, 2024
8b795ae
Fix codespell
kirillgarbar Jun 24, 2024
4f8bfec
Fix for old python
kirillgarbar Jun 24, 2024
7554870
Remove backup executor and backup in the main thread
kirillgarbar Jun 25, 2024
06cb097
Fix black
kirillgarbar Jun 25, 2024
95bc77b
Fix race condition with shadow/increment.txt
kirillgarbar Jun 25, 2024
0214f1e
Parallel backup
kirillgarbar Jul 1, 2024
6933163
Fix unit test - can't pickle lock
kirillgarbar Jul 1, 2024
d0a25c9
Revert "Fix unit test - can't pickle lock"
kirillgarbar Jul 8, 2024
827a66d
Revert "Parallel backup"
kirillgarbar Jul 8, 2024
93da739
Backup tables after all tables are frozen
kirillgarbar Jul 8, 2024
b7a4358
Move thread pool related logic to separate class
kirillgarbar Jul 8, 2024
6d1076a
Move all freeze related logic to TableFreezer
kirillgarbar Jul 8, 2024
c70ad1f
Small fixes
kirillgarbar Jul 8, 2024
190539c
Fix exception suppression in TableFreezer's __exit__
kirillgarbar Jul 8, 2024
59fdf82
Fix mypy
kirillgarbar Jul 9, 2024
2f12cb7
Fix black
kirillgarbar Jul 9, 2024
87f9d30
Move freeze logic to the method
kirillgarbar Jul 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 16 additions & 11 deletions ch_backup/backup/metadata/backup_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import socket
from datetime import datetime, timezone
from enum import Enum
from threading import Lock
from typing import Any, Dict, List, Optional, Sequence

from ch_backup.backup.metadata.access_control_metadata import AccessControlMetadata
Expand Down Expand Up @@ -70,6 +71,8 @@ def __init__(
self._user_defined_functions: List[str] = []
self._named_collections: List[str] = []

self._lock = Lock()

def __str__(self) -> str:
return self.dump_json()

Expand Down Expand Up @@ -264,16 +267,17 @@ def add_table(self, table: TableMetadata) -> None:
"""
Add table to backup metadata.
"""
tables = self._databases[table.database]["tables"]
with self._lock:
tables = self._databases[table.database]["tables"]

assert table.name not in tables
assert table.name not in tables

tables[table.name] = table.raw_metadata
tables[table.name] = table.raw_metadata

for part in table.get_parts():
self.size += part.size
if not part.link:
self.real_size += part.size
for part in table.get_parts():
self.size += part.size
if not part.link:
self.real_size += part.size

def add_udf(self, udf_name: str) -> None:
"""
Expand Down Expand Up @@ -328,11 +332,12 @@ def add_part(self, part: PartMetadata) -> None:
"""
Add data part to backup metadata.
"""
self.get_table(part.database, part.table).add_part(part)
with self._lock:
self.get_table(part.database, part.table).add_part(part)

self.size += part.size
if not part.link:
self.real_size += part.size
self.size += part.size
if not part.link:
self.real_size += part.size

def remove_parts(self, table: TableMetadata, parts: List[PartMetadata]) -> None:
"""
Expand Down
4 changes: 3 additions & 1 deletion ch_backup/ch_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ def __init__(self, config: Config) -> None:
self._config = config
self._access_backup_manager = AccessBackup()
self._database_backup_manager = DatabaseBackup()
self._table_backup_manager = TableBackup()
self._table_backup_manager = TableBackup(
self._config.get("multiprocessing").get("backup_threads", 1)
)
self._udf_backup_manager = UDFBackup()
self._nc_backup_manager = NamedCollectionsBackup()

Expand Down
70 changes: 52 additions & 18 deletions ch_backup/clickhouse/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from ch_backup.exceptions import ClickhouseBackupError
from ch_backup.util import (
chown_dir_contents,
chown_file,
escape,
list_dir_files,
retry,
Expand Down Expand Up @@ -209,9 +210,12 @@
CREATE TABLE IF NOT EXISTS `{system_db}`._deduplication_info_current (
name String,
checksum String,
database String,
table String,
)
ENGINE = MergeTree()
ORDER BY (name, checksum)
PARTITION BY (database, table)
"""
)

Expand Down Expand Up @@ -474,15 +478,27 @@ def system_unfreeze(self, backup_name: str) -> None:
query_sql = SYSTEM_UNFREEZE_SQL.format(backup_name=backup_name)
self._ch_client.query(query_sql, timeout=self._unfreeze_timeout)

def remove_freezed_data(self) -> None:
def remove_freezed_data(
self, backup_name: Optional[str] = None, table: Optional[Table] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's introduce a new object with backup_name and table as members. Or add checking and throwing an exception when only one from the pair is provided.

) -> None:
"""
Remove all freezed partitions from all local disks.
"""
for disk in self._disks.values():
if disk.type == "local":
shadow_path = os.path.join(disk.path, "shadow")
logging.debug("Removing shadow data: {}", shadow_path)
self._remove_shadow_data(shadow_path)
if backup_name and table:
for table_data_path, disk in table.paths_with_disks:
if disk.type == "local":
table_relative_path = os.path.relpath(table_data_path, disk.path)
shadow_path = os.path.join(
disk.path, "shadow", backup_name, table_relative_path
)
logging.debug("Removing shadow data: {}", shadow_path)
self._remove_shadow_data(shadow_path)
else:
for disk in self._disks.values():
if disk.type == "local":
shadow_path = os.path.join(disk.path, "shadow")
logging.debug("Removing shadow data: {}", shadow_path)
self._remove_shadow_data(shadow_path)

def remove_freezed_part(self, part: FrozenPart) -> None:
"""
Expand Down Expand Up @@ -776,6 +792,26 @@ def chown_dir(self, dir_path: str) -> None:
need_recursion,
)

def create_shadow_increment(self) -> None:
"""
Create shadow/increment.txt to fix race condition with parallel freeze.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there some issue or comment in the code of CH about this to mention here ?

Must be used before freezing more than one table at once.
"""
default_shadow_path = Path(self._root_data_path) / "shadow"
increment_path = default_shadow_path / "increment.txt"
if os.path.exists(increment_path):
return
if not os.path.exists(default_shadow_path):
os.mkdir(default_shadow_path)
self.chown_dir(str(default_shadow_path))
with open(increment_path, "w", encoding="utf-8") as file:
file.write("0")
chown_file(
self._ch_ctl_config["user"],
self._ch_ctl_config["group"],
str(increment_path),
)

@retry(OSError)
def _remove_shadow_data(self, path: str) -> None:
if path.find("/shadow") == -1:
Expand Down Expand Up @@ -921,12 +957,13 @@ def create_deduplication_table(self):
system_db=escape(self._backup_config["system_database"])
)
)
self._ch_client.query(
TRUNCATE_TABLE_IF_EXISTS_SQL.format(
db_name=escape(self._backup_config["system_database"]),
table_name="_deduplication_info",
for table_name in ["_deduplication_info", "_deduplication_info_current"]:
self._ch_client.query(
TRUNCATE_TABLE_IF_EXISTS_SQL.format(
db_name=escape(self._backup_config["system_database"]),
table_name=table_name,
)
)
)
self._ch_client.query(
CREATE_IF_NOT_EXISTS_DEDUP_TABLE_SQL.format(
system_db=escape(self._backup_config["system_database"])
Expand All @@ -952,19 +989,16 @@ def get_deduplication_info(
"""
Get deduplication info for given frozen parts of a table
"""
self._ch_client.query(
TRUNCATE_TABLE_IF_EXISTS_SQL.format(
db_name=escape(self._backup_config["system_database"]),
table_name="_deduplication_info_current",
)
)
self._ch_client.query(
CREATE_IF_NOT_EXISTS_DEDUP_TABLE_CURRENT_SQL.format(
system_db=escape(self._backup_config["system_database"])
)
)

batch = [f"('{part.name}','{part.checksum}')" for part in frozen_parts.values()]
batch = [
f"('{part.name}','{part.checksum}','{database}','{table}')"
for part in frozen_parts.values()
]
self._ch_client.query(
INSERT_DEDUP_INFO_BATCH_SQL.format(
system_db=escape(self._backup_config["system_database"]),
Expand Down
2 changes: 2 additions & 0 deletions ch_backup/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ def _as_seconds(t: str) -> int:
"workers": 4,
# The number of processes for parts restoring from S3 disks.
"cloud_storage_restore_workers": 4,
# The number of threads for parallel freeze and backup of tables
"backup_threads": 4,
},
"pipeline": {
# Is asynchronous pipelines used (based on Pypeln library)
Expand Down
Loading
Loading