Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deduplicate with ClickHouse table #159

Merged
merged 11 commits into from
Jun 17, 2024
196 changes: 88 additions & 108 deletions ch_backup/backup/deduplication.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from collections import defaultdict
from copy import copy
from datetime import timedelta
from typing import Dict, List, Optional, Sequence, Set
from typing import DefaultDict, Dict, List, Sequence

from ch_backup import logging
from ch_backup.backup.layout import BackupLayout
Expand All @@ -22,6 +22,9 @@ class PartDedupInfo(Slotted):
"""

__slots__ = (
"database",
"table",
"name",
"backup_path",
"checksum",
"size",
Expand All @@ -31,8 +34,12 @@ class PartDedupInfo(Slotted):
"verified",
)

# pylint: disable=too-many-arguments
def __init__(
self,
database: str,
table: str,
name: str,
backup_path: str,
checksum: str,
size: int,
Expand All @@ -41,6 +48,9 @@ def __init__(
disk_name: str,
verified: bool,
) -> None:
self.database = database
self.table = table
self.name = name
self.backup_path = backup_path
self.checksum = checksum
self.size = size
Expand All @@ -49,69 +59,41 @@ def __init__(
self.disk_name = disk_name
self.verified = verified


TableDedupInfo = Dict[str, PartDedupInfo]


class DatabaseDedupInfo:
"""
Information about data parts of single database to use for deduplication and creation of incremental backups.
"""

def __init__(self, tables: Dict[str, TableDedupInfo] = None) -> None:
if tables is None:
tables = defaultdict(dict)
self._tables = tables

def table(self, table_name: str) -> TableDedupInfo:
def to_sql(self):
"""
Return deduplication information for the table.
Convert to string to use it in insert query
"""
return self._tables[table_name]

def __repr__(self):
return f"DatabaseDedupInfo({dict(self._tables)})"
files_array = "[" + ",".join(f"'{file}'" for file in self.files) + "]"
return f"('{self.database}','{self.table}','{self.name}','{self.backup_path}','{self.checksum}',{self.size},{files_array},{int(self.tarball)},'{self.disk_name}',{int(self.verified)})"

def __eq__(self, other):
return self.__dict__ == other.__dict__

TableDedupReferences = set
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use Set[<needed type>] here and use it only for annotation


class DedupInfo:
"""
Information about data parts of all databases to use for deduplication and creation of incremental backups.
"""

def __init__(self, databases: Dict[str, DatabaseDedupInfo] = None) -> None:
if databases is None:
databases = defaultdict(DatabaseDedupInfo)
self._databases = databases
DatabaseDedupReferences = DefaultDict[str, TableDedupReferences]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is better to use more general type for annotations, e.g. Dict here would be better


def database(self, database_name: str) -> DatabaseDedupInfo:
"""
Return deduplication information for the database.
"""
return self._databases[database_name]
DedupReferences = DefaultDict[str, DatabaseDedupReferences]

def __repr__(self):
return f"DedupInfo({dict(self._databases)})"

def __eq__(self, other):
return self.__dict__ == other.__dict__
def _create_dedup_references() -> DedupReferences:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_create_empty_dedup_references ?

"""
Create empty dedup references
"""
return DedupReferences(lambda: DatabaseDedupReferences(TableDedupReferences))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code works but it's better not to do that. I mean instantiate types from typing module. Because not all of them, rather almost all of them do not allow instantiation.
Just for consistency, let's use defaultdict.



def collect_dedup_info(
context: BackupContext,
databases: Sequence[Database],
backups_with_light_meta: List[BackupMetadata],
) -> DedupInfo:
) -> None:
"""
Collect deduplication information for creating incremental backups.
"""
dedup_info = DedupInfo()
context.ch_ctl.create_deduplication_table()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do that after checking of schema_only ?


# Do not populate DedupInfo if we are creating schema-only backup.
if context.backup_meta.schema_only:
return dedup_info
return

backup_age_limit = None
if context.config.get("deduplicate_parts"):
Expand All @@ -131,15 +113,11 @@ def collect_dedup_info(
dedup_backups.append(backup)

_populate_dedup_info(
dedup_info,
context.backup_layout,
context.backup_meta.hostname,
context,
dedup_backups,
databases,
)

return dedup_info


class _DatabaseToHandle:
def __init__(self, name, replicated_tables=False, nonreplicated_tables=False):
Expand All @@ -156,20 +134,22 @@ def handled(self):


def _populate_dedup_info(
dedup_info: DedupInfo,
layout: BackupLayout,
hostname: str,
context: BackupContext,
dedup_backups_with_light_meta: List[BackupMetadata],
databases: Sequence[Database],
) -> None:
# pylint: disable=too-many-locals,too-many-branches
layout = context.backup_layout
dedup_info = _create_dedup_references()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we actually need dedup_info variable ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This dict is used here to check if processed part is already collected for deduplication, for example from previous metadata file. Not sure if it is possible if deduplication works correctly, but this check was there before.
We can use clickhouse table to check that by collecting part's metadata in batch and filtering them with JOIN query.

Copy link
Contributor

@aalexfvk aalexfvk Jun 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, we will not transfer this logic to SQL level yet. But let's add a comment about its purpose.

dedup_batch_size = context.config["deduplication_batch_size"]

databases_to_handle = {db.name: _DatabaseToHandle(db.name) for db in databases}
dedup_backup_paths = set(backup.path for backup in dedup_backups_with_light_meta)
for backup in dedup_backups_with_light_meta:
backup = layout.reload_backup(backup, use_light_meta=False)

# Process only replicated tables if backup is created on replica.
only_replicated = hostname != backup.hostname
only_replicated = context.backup_meta.hostname != backup.hostname

databases_to_iterate = []
for db_name in backup.get_databases():
Expand All @@ -187,8 +167,9 @@ def _populate_dedup_info(
if db.handled:
del databases_to_handle[db_name]

dedup_info_batch = []
for db in databases_to_iterate:
db_dedup_info = dedup_info.database(db.name)
db_dedup_info = dedup_info[db.name]
for table in backup.get_tables(db.name):
replicated = is_replicated(table.engine)
if replicated and db.replicated_tables_handled:
Expand All @@ -198,7 +179,7 @@ def _populate_dedup_info(
):
continue

table_dedup_info = db_dedup_info.table(table.name)
table_dedup_info = db_dedup_info[table.name]
for part in table.get_parts():
if part.name in table_dedup_info:
continue
Expand All @@ -212,7 +193,10 @@ def _populate_dedup_info(
verified = False
backup_path = backup.path

table_dedup_info[part.name] = PartDedupInfo(
part_dedup = PartDedupInfo(
database=db.name,
table=table.name,
name=part.name,
backup_path=backup_path,
checksum=part.checksum,
size=part.size,
Expand All @@ -222,60 +206,65 @@ def _populate_dedup_info(
verified=verified,
)

table_dedup_info.add(part.name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where part names from table_dedup_info are used ?

dedup_info_batch.append(part_dedup.to_sql())

if len(dedup_info_batch) >= dedup_batch_size:
context.ch_ctl.insert_deduplication_info(dedup_info_batch)
dedup_info_batch.clear()

if dedup_info_batch:
context.ch_ctl.insert_deduplication_info(dedup_info_batch)

if not databases_to_handle:
break


def deduplicate_part(
layout: BackupLayout, fpart: FrozenPart, dedup_info: TableDedupInfo
) -> Optional[PartMetadata]:
def deduplicate_parts(
context: BackupContext,
database: str,
table: str,
frozen_parts: Dict[str, FrozenPart],
) -> Dict[str, PartMetadata]:
"""
Deduplicate part if it's possible.
"""
part_name = fpart.name

logging.debug('Looking for deduplication of part "{}"', part_name)

existing_part = dedup_info.get(part_name)
if not existing_part:
return None

if existing_part.checksum != fpart.checksum:
return None

part = PartMetadata(
database=fpart.database,
table=fpart.table,
name=part_name,
checksum=existing_part.checksum,
size=existing_part.size,
link=existing_part.backup_path,
files=existing_part.files,
tarball=existing_part.tarball,
disk_name=existing_part.disk_name,
)
layout = context.backup_layout

if not existing_part.verified:
if not layout.check_data_part(existing_part.backup_path, part):
logging.debug(
'Part "{}" found in "{}", but it\'s invalid, skipping',
part_name,
existing_part.backup_path,
)
return None

logging.debug(
'Part "{}" found in "{}", reusing', part_name, existing_part.backup_path
existing_parts = context.ch_ctl.get_deduplication_info(
database, table, frozen_parts
)
deduplicated_parts: Dict[str, PartMetadata] = {}

for existing_part in existing_parts:
part = PartMetadata(
database=database,
table=table,
name=existing_part["name"],
checksum=existing_part["checksum"],
size=int(existing_part["size"]),
link=existing_part["backup_path"],
files=existing_part["files"],
tarball=existing_part["tarball"],
disk_name=existing_part["disk_name"],
)

return part

if not existing_part["verified"]:
if not layout.check_data_part(existing_part["backup_path"], part):
logging.debug(
'Part "{}" found in "{}", but it\'s invalid, skipping',
part.name,
existing_part["backup_path"],
)
continue

TableDedupReferences = Set[str]
deduplicated_parts[part.name] = part

DatabaseDedupReferences = Dict[str, TableDedupReferences]
logging.debug(
'Part "{}" found in "{}", reusing', part.name, existing_part["backup_path"]
)

DedupReferences = Dict[str, DatabaseDedupReferences]
return deduplicated_parts


def collect_dedup_references_for_batch_backup_deletion(
Expand All @@ -287,7 +276,7 @@ def collect_dedup_references_for_batch_backup_deletion(
Collect deduplication information for deleting multiple backups. It contains names of data parts that should
pe preserved during deletion.
"""
dedup_references: Dict[str, DedupReferences] = defaultdict(dict)
dedup_references: Dict[str, DedupReferences] = defaultdict(_create_dedup_references)

deleting_backup_name_resolver = {
b.path: b.name for b in deleting_backups_light_meta
Expand All @@ -312,13 +301,4 @@ def collect_dedup_references_for_batch_backup_deletion(
def _add_part_to_dedup_references(
dedup_references: DedupReferences, part: PartMetadata
) -> None:
if part.database not in dedup_references:
dedup_references[part.database] = {part.table: {part.name}}
return

db_dedup_references = dedup_references[part.database]
if part.table not in db_dedup_references:
db_dedup_references[part.table] = {part.name}
return

db_dedup_references[part.table].add(part.name)
dedup_references[part.database][part.table].add(part.name)
4 changes: 3 additions & 1 deletion ch_backup/backup_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ def ch_ctl(self) -> ClickhouseCTL:
Getter ch_ctl
"""
if not hasattr(self, "_ch_ctl"):
self._ch_ctl = ClickhouseCTL(self._ch_ctl_conf, self._main_conf)
self._ch_ctl = ClickhouseCTL(
self._ch_ctl_conf, self._main_conf, self._config
)
return self._ch_ctl

@ch_ctl.setter
Expand Down
9 changes: 4 additions & 5 deletions ch_backup/ch_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def _context(self) -> BackupContext:
ctx.ch_ctl_conf = self._config["clickhouse"]
ctx.main_conf = self._config["main"]

ctx.ch_ctl = ClickhouseCTL(ctx.ch_ctl_conf, ctx.main_conf)
ctx.ch_ctl = ClickhouseCTL(ctx.ch_ctl_conf, ctx.main_conf, ctx.config)
ctx.backup_layout = BackupLayout(self._config)

ctx.config = self._config["backup"]
Expand Down Expand Up @@ -182,7 +182,7 @@ def backup(
self._nc_backup_manager.backup(self._context)
if sources.schemas_included():
self._database_backup_manager.backup(self._context, databases)
dedup_info = collect_dedup_info(
collect_dedup_info(
context=self._context,
backups_with_light_meta=backups_with_light_meta,
databases=databases,
Expand All @@ -191,7 +191,6 @@ def backup(
self._context,
databases,
db_tables,
dedup_info,
schema_only=sources.schema_only,
)

Expand Down Expand Up @@ -450,10 +449,10 @@ def _delete(

logging.info("Removing non-shared backup data parts")
for db_name in backup.get_databases():
db_dedup_references = dedup_references.get(db_name, {})
db_dedup_references = dedup_references[db_name]
for table in backup.get_tables(db_name):
self._delete_data_parts(
backup, table, db_dedup_references.get(table.name)
backup, table, db_dedup_references[table.name]
)

self._context.ch_ctl.system_unfreeze(backup.name)
Expand Down
Loading