Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deduplicate with ClickHouse table #159

Merged
merged 11 commits into from
Jun 17, 2024
201 changes: 92 additions & 109 deletions ch_backup/backup/deduplication.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from collections import defaultdict
from copy import copy
from datetime import timedelta
from typing import Dict, List, Optional, Sequence, Set
from typing import Dict, List, Sequence, Set

from ch_backup import logging
from ch_backup.backup.layout import BackupLayout
Expand All @@ -22,6 +22,9 @@ class PartDedupInfo(Slotted):
"""

__slots__ = (
"database",
"table",
"name",
"backup_path",
"checksum",
"size",
Expand All @@ -31,8 +34,12 @@ class PartDedupInfo(Slotted):
"verified",
)

# pylint: disable=too-many-arguments
def __init__(
self,
database: str,
table: str,
name: str,
backup_path: str,
checksum: str,
size: int,
Expand All @@ -41,6 +48,9 @@ def __init__(
disk_name: str,
verified: bool,
) -> None:
self.database = database
self.table = table
self.name = name
self.backup_path = backup_path
self.checksum = checksum
self.size = size
Expand All @@ -49,69 +59,41 @@ def __init__(
self.disk_name = disk_name
self.verified = verified


TableDedupInfo = Dict[str, PartDedupInfo]


class DatabaseDedupInfo:
"""
Information about data parts of single database to use for deduplication and creation of incremental backups.
"""

def __init__(self, tables: Dict[str, TableDedupInfo] = None) -> None:
if tables is None:
tables = defaultdict(dict)
self._tables = tables

def table(self, table_name: str) -> TableDedupInfo:
def to_sql(self):
"""
Return deduplication information for the table.
Convert to string to use it in insert query
"""
return self._tables[table_name]

def __repr__(self):
return f"DatabaseDedupInfo({dict(self._tables)})"
files_array = "[" + ",".join(f"'{file}'" for file in self.files) + "]"
return f"('{self.database}','{self.table}','{self.name}','{self.backup_path}','{self.checksum}',{self.size},{files_array},{int(self.tarball)},'{self.disk_name}',{int(self.verified)})"

def __eq__(self, other):
return self.__dict__ == other.__dict__

TableDedupReferences = Set[str]

class DedupInfo:
"""
Information about data parts of all databases to use for deduplication and creation of incremental backups.
"""

def __init__(self, databases: Dict[str, DatabaseDedupInfo] = None) -> None:
if databases is None:
databases = defaultdict(DatabaseDedupInfo)
self._databases = databases
DatabaseDedupReferences = Dict[str, TableDedupReferences]

def database(self, database_name: str) -> DatabaseDedupInfo:
"""
Return deduplication information for the database.
"""
return self._databases[database_name]
DedupReferences = Dict[str, DatabaseDedupReferences]

def __repr__(self):
return f"DedupInfo({dict(self._databases)})"

def __eq__(self, other):
return self.__dict__ == other.__dict__
def _create_empty_dedup_references() -> DedupReferences:
"""
Create empty dedup references
"""
return defaultdict(lambda: defaultdict(set))


def collect_dedup_info(
context: BackupContext,
databases: Sequence[Database],
backups_with_light_meta: List[BackupMetadata],
) -> DedupInfo:
) -> None:
"""
Collect deduplication information for creating incremental backups.
"""
dedup_info = DedupInfo()

# Do not populate DedupInfo if we are creating schema-only backup.
if context.backup_meta.schema_only:
return dedup_info
return

context.ch_ctl.create_deduplication_table()

backup_age_limit = None
if context.config.get("deduplicate_parts"):
Expand All @@ -131,15 +113,11 @@ def collect_dedup_info(
dedup_backups.append(backup)

_populate_dedup_info(
dedup_info,
context.backup_layout,
context.backup_meta.hostname,
context,
dedup_backups,
databases,
)

return dedup_info


class _DatabaseToHandle:
def __init__(self, name, replicated_tables=False, nonreplicated_tables=False):
Expand All @@ -156,20 +134,23 @@ def handled(self):


def _populate_dedup_info(
dedup_info: DedupInfo,
layout: BackupLayout,
hostname: str,
context: BackupContext,
dedup_backups_with_light_meta: List[BackupMetadata],
databases: Sequence[Database],
) -> None:
# pylint: disable=too-many-locals,too-many-branches
layout = context.backup_layout
# Used to check if part is already collected for deduplication
dedup_info = _create_empty_dedup_references()
dedup_batch_size = context.config["deduplication_batch_size"]

databases_to_handle = {db.name: _DatabaseToHandle(db.name) for db in databases}
dedup_backup_paths = set(backup.path for backup in dedup_backups_with_light_meta)
for backup in dedup_backups_with_light_meta:
backup = layout.reload_backup(backup, use_light_meta=False)

# Process only replicated tables if backup is created on replica.
only_replicated = hostname != backup.hostname
only_replicated = context.backup_meta.hostname != backup.hostname

databases_to_iterate = []
for db_name in backup.get_databases():
Expand All @@ -187,8 +168,9 @@ def _populate_dedup_info(
if db.handled:
del databases_to_handle[db_name]

dedup_info_batch = []
for db in databases_to_iterate:
db_dedup_info = dedup_info.database(db.name)
db_dedup_info = dedup_info[db.name]
for table in backup.get_tables(db.name):
replicated = is_replicated(table.engine)
if replicated and db.replicated_tables_handled:
Expand All @@ -198,7 +180,7 @@ def _populate_dedup_info(
):
continue

table_dedup_info = db_dedup_info.table(table.name)
table_dedup_info = db_dedup_info[table.name]
for part in table.get_parts():
if part.name in table_dedup_info:
continue
Expand All @@ -212,7 +194,10 @@ def _populate_dedup_info(
verified = False
backup_path = backup.path

table_dedup_info[part.name] = PartDedupInfo(
part_dedup = PartDedupInfo(
database=db.name,
table=table.name,
name=part.name,
backup_path=backup_path,
checksum=part.checksum,
size=part.size,
Expand All @@ -222,60 +207,65 @@ def _populate_dedup_info(
verified=verified,
)

table_dedup_info.add(part.name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where part names from table_dedup_info are used ?

dedup_info_batch.append(part_dedup.to_sql())

if len(dedup_info_batch) >= dedup_batch_size:
context.ch_ctl.insert_deduplication_info(dedup_info_batch)
dedup_info_batch.clear()

if dedup_info_batch:
context.ch_ctl.insert_deduplication_info(dedup_info_batch)

if not databases_to_handle:
break


def deduplicate_part(
layout: BackupLayout, fpart: FrozenPart, dedup_info: TableDedupInfo
) -> Optional[PartMetadata]:
def deduplicate_parts(
context: BackupContext,
database: str,
table: str,
frozen_parts: Dict[str, FrozenPart],
) -> Dict[str, PartMetadata]:
"""
Deduplicate part if it's possible.
"""
part_name = fpart.name

logging.debug('Looking for deduplication of part "{}"', part_name)

existing_part = dedup_info.get(part_name)
if not existing_part:
return None

if existing_part.checksum != fpart.checksum:
return None

part = PartMetadata(
database=fpart.database,
table=fpart.table,
name=part_name,
checksum=existing_part.checksum,
size=existing_part.size,
link=existing_part.backup_path,
files=existing_part.files,
tarball=existing_part.tarball,
disk_name=existing_part.disk_name,
)
layout = context.backup_layout

if not existing_part.verified:
if not layout.check_data_part(existing_part.backup_path, part):
logging.debug(
'Part "{}" found in "{}", but it\'s invalid, skipping',
part_name,
existing_part.backup_path,
)
return None

logging.debug(
'Part "{}" found in "{}", reusing', part_name, existing_part.backup_path
existing_parts = context.ch_ctl.get_deduplication_info(
database, table, frozen_parts
)
deduplicated_parts: Dict[str, PartMetadata] = {}

for existing_part in existing_parts:
part = PartMetadata(
database=database,
table=table,
name=existing_part["name"],
checksum=existing_part["checksum"],
size=int(existing_part["size"]),
link=existing_part["backup_path"],
files=existing_part["files"],
tarball=existing_part["tarball"],
disk_name=existing_part["disk_name"],
)

return part

if not existing_part["verified"]:
if not layout.check_data_part(existing_part["backup_path"], part):
logging.debug(
'Part "{}" found in "{}", but it\'s invalid, skipping',
part.name,
existing_part["backup_path"],
)
continue

TableDedupReferences = Set[str]
deduplicated_parts[part.name] = part

DatabaseDedupReferences = Dict[str, TableDedupReferences]
logging.debug(
'Part "{}" found in "{}", reusing', part.name, existing_part["backup_path"]
)

DedupReferences = Dict[str, DatabaseDedupReferences]
return deduplicated_parts


def collect_dedup_references_for_batch_backup_deletion(
Expand All @@ -287,7 +277,9 @@ def collect_dedup_references_for_batch_backup_deletion(
Collect deduplication information for deleting multiple backups. It contains names of data parts that should
pe preserved during deletion.
"""
dedup_references: Dict[str, DedupReferences] = defaultdict(dict)
dedup_references: Dict[str, DedupReferences] = defaultdict(
_create_empty_dedup_references
)

deleting_backup_name_resolver = {
b.path: b.name for b in deleting_backups_light_meta
Expand All @@ -312,13 +304,4 @@ def collect_dedup_references_for_batch_backup_deletion(
def _add_part_to_dedup_references(
dedup_references: DedupReferences, part: PartMetadata
) -> None:
if part.database not in dedup_references:
dedup_references[part.database] = {part.table: {part.name}}
return

db_dedup_references = dedup_references[part.database]
if part.table not in db_dedup_references:
db_dedup_references[part.table] = {part.name}
return

db_dedup_references[part.table].add(part.name)
dedup_references[part.database][part.table].add(part.name)
4 changes: 3 additions & 1 deletion ch_backup/backup_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ def ch_ctl(self) -> ClickhouseCTL:
Getter ch_ctl
"""
if not hasattr(self, "_ch_ctl"):
self._ch_ctl = ClickhouseCTL(self._ch_ctl_conf, self._main_conf)
self._ch_ctl = ClickhouseCTL(
self._ch_ctl_conf, self._main_conf, self._config
)
return self._ch_ctl

@ch_ctl.setter
Expand Down
9 changes: 4 additions & 5 deletions ch_backup/ch_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def _context(self) -> BackupContext:
ctx.ch_ctl_conf = self._config["clickhouse"]
ctx.main_conf = self._config["main"]

ctx.ch_ctl = ClickhouseCTL(ctx.ch_ctl_conf, ctx.main_conf)
ctx.ch_ctl = ClickhouseCTL(ctx.ch_ctl_conf, ctx.main_conf, ctx.config)
ctx.backup_layout = BackupLayout(self._config)

ctx.config = self._config["backup"]
Expand Down Expand Up @@ -182,7 +182,7 @@ def backup(
self._nc_backup_manager.backup(self._context)
if sources.schemas_included():
self._database_backup_manager.backup(self._context, databases)
dedup_info = collect_dedup_info(
collect_dedup_info(
context=self._context,
backups_with_light_meta=backups_with_light_meta,
databases=databases,
Expand All @@ -191,7 +191,6 @@ def backup(
self._context,
databases,
db_tables,
dedup_info,
schema_only=sources.schema_only,
)

Expand Down Expand Up @@ -450,10 +449,10 @@ def _delete(

logging.info("Removing non-shared backup data parts")
for db_name in backup.get_databases():
db_dedup_references = dedup_references.get(db_name, {})
db_dedup_references = dedup_references[db_name]
for table in backup.get_tables(db_name):
self._delete_data_parts(
backup, table, db_dedup_references.get(table.name)
backup, table, db_dedup_references[table.name]
)

self._context.ch_ctl.system_unfreeze(backup.name)
Expand Down
Loading