Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MDB-31705] Parallel system drop replica #196

Merged
merged 2 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ch_backup/ch_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,7 @@ def _restore(
select_replica_drop(
replica_name, self._context.ch_ctl.get_macros()
),
self._config["multiprocessing"]["drop_replica_threads"],
)

# Restore databases.
Expand Down
36 changes: 27 additions & 9 deletions ch_backup/clickhouse/metadata_cleaner.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import copy
import os
from concurrent.futures import Future, ThreadPoolExecutor
from typing import Dict, List, Optional

from ch_backup import logging
Expand Down Expand Up @@ -38,22 +39,24 @@ class MetadataCleaner:
Class for cleaning up replica metadata from zookeeper.
"""

def __init__(self, ch_ctl: ClickhouseCTL, replica_to_drop: str) -> None:
def __init__(
self, ch_ctl: ClickhouseCTL, replica_to_drop: str, max_workers: int
) -> None:
self._ch_ctl = ch_ctl
self._macros = self._ch_ctl.get_macros()
self._replica_to_drop = replica_to_drop or self._macros.get("replica")

if not self._replica_to_drop:
raise ConfigurationError(
"Can't get the replica name. Please, specify it through macros or replica_name knob."
)
self._exec_pool = ThreadPoolExecutor(max_workers)

def clean_tables_metadata(self, replicated_tables: List[Table]) -> None:
"""
Remove replica tables metadata from zookeeper.
"""
replicated_table_paths = get_table_zookeeper_paths(replicated_tables)

tasks: Dict[str, Future] = {}
for table, table_path in replicated_table_paths:
table_macros = copy.copy(self._macros)
macros_to_override = dict(
Expand All @@ -64,14 +67,21 @@ def clean_tables_metadata(self, replicated_tables: List[Table]) -> None:
path_resolved = os.path.abspath(replace_macros(table_path, table_macros))

logging.debug(
"Removing replica {} from table {} metadata from zookeeper {}.",
"Scheduling drop replica {} from table {} metadata from zookeeper {}.",
self._replica_to_drop,
f"{table.database}.{table.database}",
path_resolved,
)
future = self._exec_pool.submit(
self._ch_ctl.system_drop_replica, self._replica_to_drop, path_resolved # type: ignore
)
tasks[f"{table.database}.{table.name}"] = future

for full_table_name, future in tasks.items():
try:
self._ch_ctl.system_drop_replica(
replica=self._replica_to_drop, zookeeper_path=path_resolved # type: ignore
future.result()
logging.debug(
"Successful zk metadata cleanup for table {}", full_table_name
)
except ClickhouseError as ch_error:
if "does not look like a table path" in str(ch_error):
Expand All @@ -93,6 +103,7 @@ def clean_database_metadata(self, replicated_databases: List[Database]) -> None:
return

replicated_databases_paths = get_database_zookeeper_paths(replicated_databases)
tasks: Dict[str, Future] = {}

for database, database_path, shard in replicated_databases_paths:
db_macros = copy.copy(self._macros)
Expand All @@ -106,14 +117,21 @@ def clean_database_metadata(self, replicated_databases: List[Database]) -> None:
)

logging.debug(
"Removing replica {} from database {} metadata from zookeeper {}.",
"Scheduling replica {} from database {} metadata from zookeeper {}.",
full_replica_name,
database.name,
path_resolved,
)
future = self._exec_pool.submit(
self._ch_ctl.system_drop_database_replica, full_replica_name, path_resolved # type: ignore
)
tasks[f"{database.name}"] = future

for database_name, future in tasks.items():
try:
self._ch_ctl.system_drop_database_replica(
replica=full_replica_name, zookeeper_path=path_resolved # type: ignore
future.result()
logging.debug(
"Successful zk metadata cleanup for database {}", database_name
)
except ClickhouseError as ch_error:
if "does not look like a path of Replicated database" in str(
Expand Down
2 changes: 2 additions & 0 deletions ch_backup/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ def _as_seconds(t: str) -> int:
"cloud_storage_restore_workers": 4,
# The number of threads for parallel freeze of tables
"freeze_threads": 4,
# The number of threads for parallel drop replica
"drop_replica_threads": 8,
},
"pipeline": {
# Is asynchronous pipelines used (based on Pypeln library)
Expand Down
Loading