Skip to content

Commit

Permalink
Force to cleanup metadata and and skipping non-existent objects
Browse files Browse the repository at this point in the history
  • Loading branch information
MikhailBurdukov committed Oct 29, 2024
1 parent cf766e2 commit 0bf6988
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 3 deletions.
1 change: 1 addition & 0 deletions ch_backup/ch_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,7 @@ def _restore(
if clean_zookeeper and len(self._context.zk_config.get("hosts")) > 0:
metadata_cleaner = MetadataCleaner(
self._context.ch_ctl,
self._context.zk_ctl,
select_replica_drop(
replica_name, self._context.ch_ctl.get_macros()
),
Expand Down
59 changes: 56 additions & 3 deletions ch_backup/clickhouse/metadata_cleaner.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from concurrent.futures import Future, ThreadPoolExecutor
from typing import Dict, List, Optional

from kazoo.exceptions import NoNodeError

from ch_backup import logging
from ch_backup.clickhouse.client import ClickhouseError
from ch_backup.clickhouse.control import ClickhouseCTL
Expand All @@ -17,6 +19,7 @@
get_table_zookeeper_paths,
replace_macros,
)
from ch_backup.zookeeper.zookeeper import ZookeeperCTL


def select_replica_drop(replica_name: Optional[str], macros: Dict) -> str:
Expand All @@ -40,9 +43,14 @@ class MetadataCleaner:
"""

def __init__(
self, ch_ctl: ClickhouseCTL, replica_to_drop: str, max_workers: int
self,
ch_ctl: ClickhouseCTL,
zk_ctl: ZookeeperCTL,
replica_to_drop: str,
max_workers: int,
) -> None:
self._ch_ctl = ch_ctl
self._zk_ctl = zk_ctl
self._macros = self._ch_ctl.get_macros()
self._replica_to_drop = replica_to_drop or self._macros.get("replica")
if not self._replica_to_drop:
Expand All @@ -65,17 +73,41 @@ def clean_tables_metadata(self, replicated_tables: List[Table]) -> None:
table_macros.update(macros_to_override)

path_resolved = os.path.abspath(replace_macros(table_path, table_macros))
full_table_name = f"{table.database}.{table.database}"

with self._zk_ctl.zk_client as zk_client:
# Both paths are already abs.
full_table_zk_path = self._zk_ctl.zk_root_path + path_resolved

if not zk_client.exists(full_table_zk_path):
logging.debug(
"There are no nodes for the replicated table {} with zk path {}",
full_table_name,
full_table_zk_path,
)
continue

# We are sure that we want to drop the table from zk.
# To force it we will remove it active flag.
active_flag_path = os.path.join(
full_table_zk_path, "replicas", self._replica_to_drop, "is_active" # type: ignore
)
try:
if zk_client.exists(active_flag_path):
zk_client.delete(active_flag_path)
except NoNodeError:
pass

logging.debug(
"Scheduling drop replica {} from table {} metadata from zookeeper {}.",
self._replica_to_drop,
f"{table.database}.{table.database}",
full_table_name,
path_resolved,
)
future = self._exec_pool.submit(
self._ch_ctl.system_drop_replica, self._replica_to_drop, path_resolved # type: ignore
)
tasks[f"{table.database}.{table.name}"] = future
tasks[full_table_name] = future

for full_table_name, future in tasks.items():
try:
Expand Down Expand Up @@ -116,6 +148,27 @@ def clean_database_metadata(self, replicated_databases: List[Database]) -> None:
f"{replace_macros(shard, db_macros)}|{self._replica_to_drop}"
)

with self._zk_ctl.zk_client as zk_client:
# Both paths are already abs.
full_database_zk_path = self._zk_ctl.zk_root_path + path_resolved

if not zk_client.exists(full_database_zk_path):
logging.debug(
"There are no nodes for the replicated database {} with zk path {}",
database.name,
full_database_zk_path,
)
continue

active_flag_path = os.path.join(
full_database_zk_path, "replicas", full_replica_name, "active"
)
try:
if zk_client.exists(active_flag_path):
zk_client.delete(active_flag_path)
except NoNodeError:
pass

logging.debug(
"Scheduling replica {} from database {} metadata from zookeeper {}.",
full_replica_name,
Expand Down
34 changes: 34 additions & 0 deletions tests/integration/features/backup_replicated.feature
Original file line number Diff line number Diff line change
Expand Up @@ -858,3 +858,37 @@ Feature: Backup replicated merge tree table
Examples:
| zookeeper_path |
|/databases/replicated/db_repl|

## Note: Sometimes we can have active orphaned table in the zookeeper.
## Here we are imitating such situation by creating objects with static replica name.
Scenario: Host resetup with active orphaned objects in zookeeper.
Given we have enabled shared zookeeper for clickhouse01
And we have enabled shared zookeeper for clickhouse02
And we have executed queries on clickhouse01
"""
DROP DATABASE IF EXISTS test_db SYNC;
CREATE DATABASE test_db;
CREATE TABLE test_db.table_01 (
EventDate DateTime,
CounterID UInt32,
UserID UInt32
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/shard01/test_db.table_01', 'replica')
PARTITION BY CounterID % 10
ORDER BY (CounterID, EventDate, intHash32(UserID))
SAMPLE BY intHash32(UserID);
INSERT INTO test_db.table_01 SELECT now(), number, rand() FROM system.numbers LIMIT 10;
DROP DATABASE IF EXISTS db_repl SYNC;
CREATE DATABASE db_repl ENGINE = Replicated('/databases/replicated/db_repl', 'shard_01', 'replica');
"""
When we create clickhouse01 clickhouse backup
Then we got the following backups on clickhouse01
| num | state | data_count | link_count |
| 0 | created | 10 | 0 |

When we restore clickhouse backup #0 to clickhouse02
"""
replica_name: replica
schema_only: true
"""
6 changes: 6 additions & 0 deletions tests/integration/steps/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from behave import given, then, when
from hamcrest import assert_that, equal_to, has_length
from tenacity import retry, stop_after_attempt, wait_fixed
from tests.integration.modules.ch_backup import BackupManager
from tests.integration.modules.clickhouse import ClickhouseClient
from tests.integration.modules.docker import get_container, put_file
from tests.integration.modules.steps import get_step_data
Expand Down Expand Up @@ -37,6 +38,11 @@ def step_enable_shared_zookeeper_for_clickhouse(context, node):
)
assert container.exec_run("supervisorctl restart clickhouse").exit_code == 0

shared_zk_path = "/" + render_template(context, "{{ conf.zk.shared_node }}")

dict_to_update = {"zookeeper": {"root_path": shared_zk_path}}
BackupManager(context, node).update_config(dict_to_update)


@given("clickhouse on {node:w} has test schema")
@when("clickhouse on {node:w} has test schema")
Expand Down

0 comments on commit 0bf6988

Please sign in to comment.