Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel FREEZE TABLE #164

Merged
merged 24 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
4c3acbf
Freeze and backup thread pools
kirillgarbar Jun 19, 2024
faecbed
Remove freezed data for one table
kirillgarbar Jun 20, 2024
8807c86
Freeze workers option in config
kirillgarbar Jun 20, 2024
1e86b09
More flexibility of data generation for tests
kirillgarbar Jun 22, 2024
6b7182b
Parallel freeze test
kirillgarbar Jun 22, 2024
c45b386
Move executors to subclass
kirillgarbar Jun 22, 2024
ec20451
Partition by prefix to fix dedup tests
kirillgarbar Jun 22, 2024
8b795ae
Fix codespell
kirillgarbar Jun 24, 2024
4f8bfec
Fix for old python
kirillgarbar Jun 24, 2024
7554870
Remove backup executor and backup in the main thread
kirillgarbar Jun 25, 2024
06cb097
Fix black
kirillgarbar Jun 25, 2024
95bc77b
Fix race condition with shadow/increment.txt
kirillgarbar Jun 25, 2024
0214f1e
Parallel backup
kirillgarbar Jul 1, 2024
6933163
Fix unit test - can't pickle lock
kirillgarbar Jul 1, 2024
d0a25c9
Revert "Fix unit test - can't pickle lock"
kirillgarbar Jul 8, 2024
827a66d
Revert "Parallel backup"
kirillgarbar Jul 8, 2024
93da739
Backup tables after all tables are frozen
kirillgarbar Jul 8, 2024
b7a4358
Move thread pool related logic to separate class
kirillgarbar Jul 8, 2024
6d1076a
Move all freeze related logic to TableFreezer
kirillgarbar Jul 8, 2024
c70ad1f
Small fixes
kirillgarbar Jul 8, 2024
190539c
Fix exception suppression in TableFreezer's __exit__
kirillgarbar Jul 8, 2024
59fdf82
Fix mypy
kirillgarbar Jul 9, 2024
2f12cb7
Fix black
kirillgarbar Jul 9, 2024
87f9d30
Move freeze logic to the method
kirillgarbar Jul 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ch_backup/ch_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,9 @@ def backup(
databases,
db_tables,
schema_only=sources.schema_only,
freeze_threads=self._config["multiprocessing"][
"freeze_threads"
],
)

self._context.backup_meta.state = BackupState.CREATED
Expand Down
51 changes: 45 additions & 6 deletions ch_backup/clickhouse/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from ch_backup.exceptions import ClickhouseBackupError
from ch_backup.util import (
chown_dir_contents,
chown_file,
escape,
list_dir_files,
retry,
Expand Down Expand Up @@ -474,15 +475,32 @@ def system_unfreeze(self, backup_name: str) -> None:
query_sql = SYSTEM_UNFREEZE_SQL.format(backup_name=backup_name)
self._ch_client.query(query_sql, timeout=self._unfreeze_timeout)

def remove_freezed_data(self) -> None:
def remove_freezed_data(
self, backup_name: Optional[str] = None, table: Optional[Table] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's introduce a new object with backup_name and table as members. Or add checking and throwing an exception when only one from the pair is provided.

) -> None:
"""
Remove all freezed partitions from all local disks.
"""
for disk in self._disks.values():
if disk.type == "local":
shadow_path = os.path.join(disk.path, "shadow")
logging.debug("Removing shadow data: {}", shadow_path)
self._remove_shadow_data(shadow_path)
if not (backup_name is None) == (table is None):
raise RuntimeError(
"Both backup_name and table should be None or not None at the same time"
)

if backup_name and table:
for table_data_path, disk in table.paths_with_disks:
if disk.type == "local":
table_relative_path = os.path.relpath(table_data_path, disk.path)
shadow_path = os.path.join(
disk.path, "shadow", backup_name, table_relative_path
)
logging.debug("Removing shadow data: {}", shadow_path)
self._remove_shadow_data(shadow_path)
else:
for disk in self._disks.values():
if disk.type == "local":
shadow_path = os.path.join(disk.path, "shadow")
logging.debug("Removing shadow data: {}", shadow_path)
self._remove_shadow_data(shadow_path)

def remove_freezed_part(self, part: FrozenPart) -> None:
"""
Expand Down Expand Up @@ -776,6 +794,27 @@ def chown_dir(self, dir_path: str) -> None:
need_recursion,
)

def create_shadow_increment(self) -> None:
"""
Create shadow/increment.txt to fix race condition with parallel freeze.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there some issue or comment in the code of CH about this to mention here ?

Must be used before freezing more than one table at once.
https://github.com/ClickHouse/ClickHouse/blob/597a72fd9afd88984abc10b284624c6b4d08368b/src/Common/Increment.h#L20
"""
default_shadow_path = Path(self._root_data_path) / "shadow"
increment_path = default_shadow_path / "increment.txt"
if os.path.exists(increment_path):
return
if not os.path.exists(default_shadow_path):
os.mkdir(default_shadow_path)
self.chown_dir(str(default_shadow_path))
with open(increment_path, "w", encoding="utf-8") as file:
file.write("0")
chown_file(
self._ch_ctl_config["user"],
self._ch_ctl_config["group"],
str(increment_path),
)

@retry(OSError)
def _remove_shadow_data(self, path: str) -> None:
if path.find("/shadow") == -1:
Expand Down
2 changes: 2 additions & 0 deletions ch_backup/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ def _as_seconds(t: str) -> int:
"workers": 4,
# The number of processes for parts restoring from S3 disks.
"cloud_storage_restore_workers": 4,
# The number of threads for parallel freeze of tables
"freeze_threads": 4,
},
"pipeline": {
# Is asynchronous pipelines used (based on Pypeln library)
Expand Down
188 changes: 126 additions & 62 deletions ch_backup/logic/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import os
from collections import deque
from concurrent.futures import Future, ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from functools import partial
from itertools import chain
Expand Down Expand Up @@ -55,10 +56,12 @@ def backup(
databases: Sequence[Database],
db_tables: Dict[str, list],
schema_only: bool,
freeze_threads: int,
) -> None:
"""
Backup tables metadata, MergeTree data and Cloud storage metadata.
"""

backup_name = context.backup_meta.get_sanitized_name()

for db in databases:
Expand All @@ -68,7 +71,9 @@ def backup(
db_tables[db.name],
backup_name,
schema_only,
freeze_threads,
)

self._backup_cloud_storage_metadata(context)

def _collect_local_metadata_mtime(
Expand Down Expand Up @@ -103,6 +108,7 @@ def _backup(
tables: Sequence[str],
backup_name: str,
schema_only: bool,
freeze_threads: int,
) -> None:
"""
Backup single database tables.
Expand All @@ -113,22 +119,131 @@ def _backup(
# To ensure consistency between metadata and data backups.
# See https://en.wikipedia.org/wiki/Optimistic_concurrency_control
mtimes = self._collect_local_metadata_mtime(context, db, tables)
tables_ = list(
filter(
lambda table: table.name in mtimes,
context.ch_ctl.get_tables(db.name, tables),
)
)

for table in context.ch_ctl.get_tables(db.name, tables):
if table.name not in mtimes:
continue
freezed_tables = self._freeze_tables(
context, db, tables_, backup_name, schema_only, freeze_threads
)

self._backup_table(
logging.debug('All tables from "{}" are frozen', db.name)

for table, create_statement in freezed_tables:
self._backup_freezed_table(
context,
db,
table,
backup_name,
schema_only,
mtimes,
create_statement,
)

context.ch_ctl.remove_freezed_data()

context.backup_layout.upload_backup_metadata(context.backup_meta)

@staticmethod
def _freeze_tables(
context: BackupContext,
db: Database,
tables: Sequence[Table],
backup_name: str,
schema_only: bool,
threads: int,
) -> List[Tuple[Table, bytes]]:
"""
Freeze tables in parallel.
"""
# Create shadow/increment.txt if not exists manually to avoid
# race condition with parallel freeze
context.ch_ctl.create_shadow_increment()
futures: List[Future] = []
with ThreadPoolExecutor(max_workers=threads) as pool:
for table in tables:
future = pool.submit(
TableBackup._freeze_table,
context,
db,
table,
backup_name,
schema_only,
)
futures.append(future)

result: List[Tuple[Table, bytes]] = []
for future in as_completed(futures):
table_and_statement = future.result()
if table_and_statement is not None:
result.append(table_and_statement)
return result

@staticmethod
def _freeze_table(
context: BackupContext,
db: Database,
table: Table,
backup_name: str,
schema_only: bool,
) -> Optional[Tuple[Table, bytes]]:
"""
Freeze table and return it's create statement
"""
logging.debug('Trying to freeze "{}"."{}"', table.database, table.name)

create_statement = TableBackup._load_create_statement_from_disk(table)
if not create_statement:
logging.warning(
'Skipping table backup for "{}"."{}". Local metadata is empty or absent',
db.name,
table.name,
)
return None

# Freeze only MergeTree tables
if not schema_only and is_merge_tree(table.engine):
try:
context.ch_ctl.freeze_table(backup_name, table)
except ClickhouseError:
if context.ch_ctl.does_table_exist(table.database, table.name):
raise

logging.warning(
'Table "{}"."{}" was removed by a user during backup',
table.database,
table.name,
)
return None

return (table, create_statement)

@staticmethod
def _load_create_statement_from_disk(table: Table) -> Optional[bytes]:
"""
Load a create statement of the table from a metadata file on the disk.
"""
if not table.metadata_path:
logging.debug(
'Cannot load a create statement of the table "{}"."{}". Metadata is empty',
table.database,
table.name,
)
return None
try:
return Path(table.metadata_path).read_bytes()
except OSError as e:
logging.debug(
'Cannot load a create statement of the table "{}"."{}": {}',
table.database,
table.name,
str(e),
)
return None

@staticmethod
def _backup_cloud_storage_metadata(context: BackupContext) -> None:
"""
Expand Down Expand Up @@ -272,70 +387,16 @@ def restore(
keep_going=keep_going,
)

@staticmethod
def _load_create_statement_from_disk(table: Table) -> Optional[bytes]:
"""
Load a create statement of the table from a metadata file on the disk.
"""
if not table.metadata_path:
logging.debug(
'Cannot load a create statement of the table "{}"."{}". Metadata is empty',
table.database,
table.name,
)
return None
try:
return Path(table.metadata_path).read_bytes()
except OSError as e:
logging.debug(
'Cannot load a create statement of the table "{}"."{}": {}',
table.database,
table.name,
str(e),
)
return None

def _backup_table(
def _backup_freezed_table(
self,
context: BackupContext,
db: Database,
table: Table,
backup_name: str,
schema_only: bool,
mtimes: Dict[str, TableMetadataMtime],
create_statement: bytes,
) -> None:
"""
Make backup of metadata and data of single table.

Return backup metadata of successfully backuped table, otherwise None.
"""
logging.debug(
'Performing table backup for "{}"."{}"', table.database, table.name
)
create_statement = self._load_create_statement_from_disk(table)
if not create_statement:
logging.warning(
'Skipping table backup for "{}"."{}". Local metadata is empty or absent',
db.name,
table.name,
)
return

# Freeze only MergeTree tables
if not schema_only and is_merge_tree(table.engine):
try:
context.ch_ctl.freeze_table(backup_name, table)
except ClickhouseError:
if context.ch_ctl.does_table_exist(table.database, table.name):
raise

logging.warning(
'Table "{}"."{}" was removed by a user during backup',
table.database,
table.name,
)
return

# Check if table metadata was updated
new_mtime = self._get_mtime(table.metadata_path)
if new_mtime is None or mtimes[table.name].mtime != new_mtime:
Expand All @@ -344,9 +405,12 @@ def _backup_table(
table.database,
table.name,
)
context.ch_ctl.remove_freezed_data()
context.ch_ctl.remove_freezed_data(backup_name, table)
return

logging.debug(
'Performing table backup for "{}"."{}"', table.database, table.name
)
# Add table metadata to backup metadata
context.backup_meta.add_table(
TableMetadata(table.database, table.name, table.engine, table.uuid)
Expand Down Expand Up @@ -436,7 +500,7 @@ def deduplicate_parts_in_batch(

self._validate_uploaded_parts(context, upload_observer.uploaded_parts)

context.ch_ctl.remove_freezed_data()
context.ch_ctl.remove_freezed_data(backup_name, table)

@staticmethod
def _validate_uploaded_parts(context: BackupContext, uploaded_parts: list) -> None:
Expand Down
7 changes: 7 additions & 0 deletions ch_backup/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,13 @@ def chown_dir_contents(
shutil.chown(os.path.join(dir_path, path), user, group)


def chown_file(user: str, group: str, file_path: str) -> None:
"""
Change directory user/group
"""
shutil.chown(file_path, user, group)


def list_dir_files(dir_path: str) -> List[str]:
"""
Returns paths of all files of directory (recursively), relative to its path
Expand Down
1 change: 1 addition & 0 deletions tests/integration/ch_backup.featureset
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ features/cloud_storage.feature
features/database_engines.feature
features/ddl_dictionary.feature
features/deduplication.feature
features/freeze_parallel.feature
features/incremental_restore.feature
features/metadata.feature
features/min_interval.feature
Expand Down
Loading