Skip to content

Commit

Permalink
MDB-30734 Metadata cleanup with system drop replica (#185)
Browse files Browse the repository at this point in the history
* MDB-30734 Metadata cleanup with system drop replica

* test fix

* Add cleanup for database

* Test fix

* fix

* again

* Fix for 22.8

* Review fix

* Missed
  • Loading branch information
MikhailBurdukov authored Sep 12, 2024
1 parent 0c7fadc commit 849cd5b
Show file tree
Hide file tree
Showing 20 changed files with 422 additions and 101 deletions.
10 changes: 9 additions & 1 deletion ch_backup/backup/metadata/backup_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,13 @@ def get_database(self, db_name: str) -> Database:
Get database.
"""
db_meta = self._databases[db_name]
return Database(db_name, db_meta.get("engine"), db_meta.get("metadata_path"))
return Database(
db_name,
db_meta.get("engine"),
db_meta.get("metadata_path"),
db_meta.get("uuid"),
db_meta.get("engine_full"),
)

def add_database(self, db: Database) -> None:
"""
Expand All @@ -239,6 +245,8 @@ def add_database(self, db: Database) -> None:
self._databases[db.name] = {
"engine": db.engine,
"metadata_path": db.metadata_path,
"uuid": db.uuid,
"engine_full": db.engine_full,
"tables": {},
}

Expand Down
18 changes: 15 additions & 3 deletions ch_backup/ch_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from ch_backup.backup.metadata import BackupMetadata, BackupState, TableMetadata
from ch_backup.backup.sources import BackupSources
from ch_backup.backup_context import BackupContext
from ch_backup.clickhouse.metadata_cleaner import MetadataCleaner, select_replica_drop
from ch_backup.clickhouse.models import Database
from ch_backup.config import Config
from ch_backup.exceptions import (
Expand Down Expand Up @@ -514,8 +515,20 @@ def _restore(
db.set_engine_from_sql(db_sql)
databases[db_name] = db

metadata_cleaner: Optional[MetadataCleaner] = None

if clean_zookeeper and len(self._context.zk_config.get("hosts")) > 0:
metadata_cleaner = MetadataCleaner(
self._context.ch_ctl,
select_replica_drop(
replica_name, self._context.ch_ctl.get_macros()
),
)

# Restore databases.
self._database_backup_manager.restore(self._context, databases, keep_going)
self._database_backup_manager.restore(
self._context, databases, keep_going, metadata_cleaner
)

# Restore tables and data stored on local disks.
self._table_backup_manager.restore(
Expand All @@ -524,12 +537,11 @@ def _restore(
schema_only=sources.schema_only,
tables=tables,
exclude_tables=exclude_tables,
replica_name=replica_name,
metadata_cleaner=metadata_cleaner,
cloud_storage_source_bucket=cloud_storage_source_bucket,
cloud_storage_source_path=cloud_storage_source_path,
cloud_storage_source_endpoint=cloud_storage_source_endpoint,
skip_cloud_storage=skip_cloud_storage,
clean_zookeeper=clean_zookeeper,
keep_going=keep_going,
)

Expand Down
67 changes: 64 additions & 3 deletions ch_backup/clickhouse/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,39 @@
"""
)

DROP_REPLICA_BY_ZK_PATH_SQL = strip_query(
"""
SYSTEM DROP REPLICA '{replica_name}' FROM ZKPATH '{zk_path}'
"""
)

DROP_DATABASE_REPLICA_BY_ZK_PATH_SQL = strip_query(
"""
SYSTEM DROP DATABASE REPLICA '{replica_name}' FROM ZKPATH '{zk_path}'
"""
)

GET_DATABASES_SQL = strip_query(
"""
SELECT
name,
engine,
metadata_path
metadata_path,
uuid,
engine_full
FROM system.databases
WHERE name NOT IN ('system', '_temporary_and_external_tables', 'information_schema', 'INFORMATION_SCHEMA', '{system_db}')
FORMAT JSON
"""
)

GET_DATABASES_SQL_22_8 = strip_query(
"""
SELECT
name,
engine,
metadata_path,
uuid
FROM system.databases
WHERE name NOT IN ('system', '_temporary_and_external_tables', 'information_schema', 'INFORMATION_SCHEMA', '{system_db}')
FORMAT JSON
Expand Down Expand Up @@ -361,6 +388,7 @@ def __init__(
self._freeze_timeout = self._ch_ctl_config["freeze_timeout"]
self._unfreeze_timeout = self._ch_ctl_config["unfreeze_timeout"]
self._restore_replica_timeout = self._ch_ctl_config["restore_replica_timeout"]
self._drop_replica_timeout = self._ch_ctl_config["drop_replica_timeout"]
self._ch_client = ClickhouseClient(self._ch_ctl_config)
self._ch_version = self._ch_client.query(GET_VERSION_SQL)
self._disks = self.get_disks()
Expand Down Expand Up @@ -503,12 +531,23 @@ def get_databases(

result: List[Database] = []
system_database = self._backup_config["system_database"]
ch_resp = self._ch_client.query(

query = (
GET_DATABASES_SQL.format(system_db=system_database)
if self.ch_version_ge("23.3")
else GET_DATABASES_SQL_22_8.format(system_db=system_database)
)

ch_resp = self._ch_client.query(query)
if "data" in ch_resp:
result = [
Database(row["name"], row["engine"], row["metadata_path"])
Database(
row["name"],
row["engine"],
row["metadata_path"],
row["uuid"],
row.get("engine_full"),
)
for row in ch_resp["data"]
if row["name"] not in exclude_dbs
]
Expand Down Expand Up @@ -666,6 +705,28 @@ def drop_named_collection(self, nc_name: str) -> None:
"""
self._ch_client.query(DROP_NAMED_COLLECTION_SQL.format(nc_name=escape(nc_name)))

def system_drop_replica(self, replica: str, zookeeper_path: str) -> None:
"""
System drop replica query.
"""
self._ch_client.query(
DROP_REPLICA_BY_ZK_PATH_SQL.format(
replica_name=replica, zk_path=zookeeper_path
),
timeout=self._drop_replica_timeout,
)

def system_drop_database_replica(self, replica: str, zookeeper_path: str) -> None:
"""
System drop database replica query.
"""
self._ch_client.query(
DROP_DATABASE_REPLICA_BY_ZK_PATH_SQL.format(
replica_name=replica, zk_path=zookeeper_path
),
timeout=self._drop_replica_timeout,
)

def get_database_metadata_path(self, database: str) -> str:
"""
Get filesystem absolute path to database metadata.
Expand Down
127 changes: 127 additions & 0 deletions ch_backup/clickhouse/metadata_cleaner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
"""
Zookeeper metadata cleaner for clickhouse.
"""

import copy
import os
from typing import Dict, List, Optional

from ch_backup import logging
from ch_backup.clickhouse.client import ClickhouseError
from ch_backup.clickhouse.control import ClickhouseCTL
from ch_backup.clickhouse.models import Database, Table
from ch_backup.exceptions import ConfigurationError
from ch_backup.util import (
get_database_zookeeper_paths,
get_table_zookeeper_paths,
replace_macros,
)


def select_replica_drop(replica_name: Optional[str], macros: Dict) -> str:
"""
Select replica to drop from zookeeper.
"""
selected_replica = replica_name
if not selected_replica:
selected_replica = macros.get("replica", None)

if not selected_replica:
raise ConfigurationError(
"Can't get the replica name. Please, specify it through macros or replica_name knob."
)
return selected_replica


class MetadataCleaner:
"""
Class for cleaning up replica metadata from zookeeper.
"""

def __init__(self, ch_ctl: ClickhouseCTL, replica_to_drop: str) -> None:
self._ch_ctl = ch_ctl
self._macros = self._ch_ctl.get_macros()
self._replica_to_drop = replica_to_drop or self._macros.get("replica")

if not self._replica_to_drop:
raise ConfigurationError(
"Can't get the replica name. Please, specify it through macros or replica_name knob."
)

def clean_tables_metadata(self, replicated_tables: List[Table]) -> None:
"""
Remove replica tables metadata from zookeeper.
"""
replicated_table_paths = get_table_zookeeper_paths(replicated_tables)

for table, table_path in replicated_table_paths:
table_macros = copy.copy(self._macros)
macros_to_override = dict(
database=table.database, table=table.name, uuid=table.uuid
)
table_macros.update(macros_to_override)

path_resolved = os.path.abspath(replace_macros(table_path, table_macros))

logging.debug(
"Removing replica {} from table {} metadata from zookeeper {}.",
self._replica_to_drop,
f"{table.database}.{table.database}",
path_resolved,
)
try:
self._ch_ctl.system_drop_replica(
replica=self._replica_to_drop, zookeeper_path=path_resolved # type: ignore
)
except ClickhouseError as ch_error:
if "does not look like a table path" in str(ch_error):
logging.warning(
"System drop replica failed with: {}\n Will ignore it, probably different configuration for zookeeper or tables schema.",
repr(ch_error),
)
else:
raise

def clean_database_metadata(self, replicated_databases: List[Database]) -> None:
"""
Remove replica database metadata from zookeeper.
"""
if not self._ch_ctl.ch_version_ge("23.3"):
logging.warning(
"Ch version is too old, will skip replicated database cleanup."
)
return

replicated_databases_paths = get_database_zookeeper_paths(replicated_databases)

for database, database_path, shard in replicated_databases_paths:
db_macros = copy.copy(self._macros)

macros_to_override = dict(database=database.name, uuid=database.uuid)
db_macros.update(macros_to_override)

path_resolved = os.path.abspath(replace_macros(database_path, db_macros))
full_replica_name = (
f"{replace_macros(shard, db_macros)}|{self._replica_to_drop}"
)

logging.debug(
"Removing replica {} from database {} metadata from zookeeper {}.",
full_replica_name,
database.name,
path_resolved,
)
try:
self._ch_ctl.system_drop_database_replica(
replica=full_replica_name, zookeeper_path=path_resolved # type: ignore
)
except ClickhouseError as ch_error:
if "does not look like a path of Replicated database" in str(
ch_error
) or "node doesn't exist" in str(ch_error):
logging.warning(
"System drop database replica failed with: {}\n Will ignore it, probably different configuration for zookeeper or database schema.",
repr(ch_error),
)
else:
raise
9 changes: 8 additions & 1 deletion ch_backup/clickhouse/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,12 +167,19 @@ class Database(SimpleNamespace):
"""

def __init__(
self, name: str, engine: Optional[str], metadata_path: Optional[str]
self,
name: str,
engine: Optional[str],
metadata_path: Optional[str],
uuid: Optional[str],
engine_full: Optional[str],
) -> None:
super().__init__()
self.name = name
self.engine = engine
self.metadata_path = metadata_path
self.uuid = uuid
self.engine_full = engine_full

def is_atomic(self) -> bool:
"""
Expand Down
1 change: 1 addition & 0 deletions ch_backup/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def _as_seconds(t: str) -> int:
"freeze_timeout": _as_seconds("45 min"),
"unfreeze_timeout": _as_seconds("1 hour"),
"restore_replica_timeout": _as_seconds("30 min"),
"drop_replica_timeout": _as_seconds("1 hour"),
"user": "clickhouse",
"group": "clickhouse",
"clickhouse_user": None,
Expand Down
16 changes: 14 additions & 2 deletions ch_backup/logic/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
Clickhouse backup logic for databases
"""

from typing import Dict, Sequence
from typing import Dict, Optional, Sequence

from ch_backup import logging
from ch_backup.backup_context import BackupContext
from ch_backup.clickhouse.metadata_cleaner import MetadataCleaner
from ch_backup.clickhouse.models import Database
from ch_backup.clickhouse.schema import (
embedded_schema_db_sql,
Expand All @@ -32,7 +33,10 @@ def backup(self, context: BackupContext, databases: Sequence[Database]) -> None:

@staticmethod
def restore(
context: BackupContext, databases: Dict[str, Database], keep_going: bool
context: BackupContext,
databases: Dict[str, Database],
keep_going: bool,
metadata_cleaner: Optional[MetadataCleaner],
) -> None:
"""
Restore database objects.
Expand All @@ -56,6 +60,14 @@ def restore(
databases_to_restore[name] = db
continue

if metadata_cleaner:
replicated_databases = [
database
for database in databases_to_restore.values()
if database.is_replicated_db_engine()
]
metadata_cleaner.clean_database_metadata(replicated_databases)

logging.info("Restoring databases: {}", ", ".join(databases_to_restore.keys()))
for db in databases_to_restore.values():
if db.has_embedded_metadata():
Expand Down
Loading

0 comments on commit 849cd5b

Please sign in to comment.