Skip to content

Commit

Permalink
Take a lock on wider scope in delete and purge cmd. (#68)
Browse files Browse the repository at this point in the history
* Made lock scope wider and refactor the code.
  • Loading branch information
MikhailBurdukov authored Sep 12, 2023
1 parent cc4d41e commit a977419
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 95 deletions.
31 changes: 4 additions & 27 deletions ch_backup/backup/deduplication.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,41 +273,18 @@ def deduplicate_part(
DedupReferences = Dict[str, DatabaseDedupReferences]


def collect_dedup_references_for_backup_deletion(
layout: BackupLayout,
retained_backups_with_light_meta: List[BackupMetadata],
deleting_backup_with_light_meta: BackupMetadata,
) -> DedupReferences:
"""
Collect deduplication information for deleting backup. It contains names of data parts that should pe preserved
during deletion.
"""
dedup_refences = collect_dedup_references_for_batch_backup_deletion(
layout=layout,
retained_backups_with_light_meta=retained_backups_with_light_meta,
deleting_backups_with_light_meta=[deleting_backup_with_light_meta],
)

return dedup_refences[deleting_backup_with_light_meta.name]


def collect_dedup_references_for_batch_backup_deletion(
layout: BackupLayout,
retained_backups_with_light_meta: List[BackupMetadata],
deleting_backups_with_light_meta: List[BackupMetadata],
retained_backups: List[BackupMetadata],
deleting_backups: List[BackupMetadata],
) -> Dict[str, DedupReferences]:
"""
Collect deduplication information for deleting multiple backups. It contains names of data parts that should
pe preserved during deletion.
"""
dedup_references: Dict[str, DedupReferences] = defaultdict(dict)

deleting_backup_name_resolver = {
b.path: b.name for b in deleting_backups_with_light_meta
}
for backup in retained_backups_with_light_meta:
backup = layout.reload_backup(backup, use_light_meta=False)

deleting_backup_name_resolver = {b.path: b.name for b in deleting_backups}
for backup in retained_backups:
for db_name in backup.get_databases():
for table in backup.get_tables(db_name):
for part in table.get_parts():
Expand Down
117 changes: 55 additions & 62 deletions ch_backup/ch_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,29 +307,28 @@ def delete(
found = False
deleting_backups = []
retained_backups = []
for i, backup in enumerate(
self._context.backup_layout.get_backups(use_light_meta=True)
):
if backup.name == backup_name:
deleting_backups.append(backup)
found = True
continue

if purge_partial and backup.state != BackupState.CREATED and i != 0:
deleting_backups.append(backup)
else:
retained_backups.append(backup)

if not found:
raise BackupNotFound(backup_name)

dedup_references = collect_dedup_references_for_batch_backup_deletion(
layout=self._context.backup_layout,
retained_backups_with_light_meta=retained_backups,
deleting_backups_with_light_meta=deleting_backups,
)

with self._context.locker():
for i, backup in enumerate(
self._context.backup_layout.get_backups(use_light_meta=False)
):
if backup.name == backup_name:
deleting_backups.append(backup)
found = True
continue

if purge_partial and backup.state != BackupState.CREATED and i != 0:
deleting_backups.append(backup)
else:
retained_backups.append(backup)

if not found:
raise BackupNotFound(backup_name)

dedup_references = collect_dedup_references_for_batch_backup_deletion(
retained_backups=retained_backups,
deleting_backups=deleting_backups,
)

result: Tuple[Optional[str], Optional[str]] = (None, None)
for backup in deleting_backups:
deleted_name, msg = self._delete(backup, dedup_references[backup.name])
Expand Down Expand Up @@ -358,41 +357,40 @@ def purge(self) -> Tuple[Sequence[str], Optional[str]]:
retained_backups: List[BackupMetadata] = []
deleting_backups: List[BackupMetadata] = []
backup_names = self._context.backup_layout.get_backup_names()
for backup in self._context.backup_layout.get_backups(use_light_meta=True):
if backup.name not in backup_names:
logging.info("Deleting backup without metadata: %s", backup.name)
self._context.backup_layout.delete_backup(backup.name)
continue

if retain_count > 0:
logging.info(
"Preserving backup per retain count policy: %s, state %s",
backup.name,
backup.state,
)
retained_backups.append(backup)
if backup.state == BackupState.CREATED:
retain_count -= 1
continue

if retain_time_limit and backup.start_time >= retain_time_limit:
logging.info(
"Preserving backup per retain time policy: %s, state %s",
backup.name,
backup.state,
)
retained_backups.append(backup)
continue
with self._context.locker():
for backup in self._context.backup_layout.get_backups(use_light_meta=False):
if backup.name not in backup_names:
logging.info("Deleting backup without metadata: %s", backup.name)
self._context.backup_layout.delete_backup(backup.name)
continue

if retain_count > 0:
logging.info(
"Preserving backup per retain count policy: %s, state %s",
backup.name,
backup.state,
)
retained_backups.append(backup)
if backup.state == BackupState.CREATED:
retain_count -= 1
continue

if retain_time_limit and backup.start_time >= retain_time_limit:
logging.info(
"Preserving backup per retain time policy: %s, state %s",
backup.name,
backup.state,
)
retained_backups.append(backup)
continue

deleting_backups.append(backup)
deleting_backups.append(backup)

dedup_references = collect_dedup_references_for_batch_backup_deletion(
layout=self._context.backup_layout,
retained_backups_with_light_meta=retained_backups,
deleting_backups_with_light_meta=deleting_backups,
)
dedup_references = collect_dedup_references_for_batch_backup_deletion(
retained_backups=retained_backups,
deleting_backups=deleting_backups,
)

with self._context.locker():
for backup in deleting_backups:
backup_name, _ = self._delete(backup, dedup_references[backup.name])
if backup_name:
Expand All @@ -412,18 +410,13 @@ def fix_admin_user(self, dry_run: bool = True) -> None:
self._access_backup_manager.fix_admin_user(self._context, dry_run)

def _delete(
self, backup_with_light_meta: BackupMetadata, dedup_references: DedupReferences
self, backup: BackupMetadata, dedup_references: DedupReferences
) -> Tuple[Optional[str], Optional[str]]:
logging.info(
"Deleting backup %s, state: %s",
backup_with_light_meta.name,
backup_with_light_meta.state,
backup.name,
backup.state,
)

backup = self._context.backup_layout.reload_backup(
backup_with_light_meta, use_light_meta=False
)

backup.state = BackupState.DELETING
self._context.backup_layout.upload_backup_metadata(backup)

Expand Down
21 changes: 15 additions & 6 deletions tests/unit/test_deduplication.py
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ def test_collect_dedup_info(config, creating_backup, databases, backups, result)
{
"id": "single data part",
"args": {
"retained_backups": [
"retained_backups_light_meta": [
backup_metadata(
name="backup2",
state=BackupState.CREATED,
Expand All @@ -529,7 +529,7 @@ def test_collect_dedup_info(config, creating_backup, databases, backups, result)
},
),
],
"deleting_backups": [
"deleting_backups_light_meta": [
backup_metadata(
name="backup1",
state=BackupState.CREATED,
Expand All @@ -556,13 +556,22 @@ def test_collect_dedup_info(config, creating_backup, databases, backups, result)
}
)
def test_collect_dedup_references_for_batch_backup_deletion(
retained_backups, deleting_backups, result
retained_backups_light_meta, deleting_backups_light_meta, result
):
layout = layout_mock()
retained_backups = [
layout.reload_backup(backup_light, False)
for backup_light in retained_backups_light_meta
]
deleting_backups = [
layout.reload_backup(backup_light, False)
for backup_light in deleting_backups_light_meta
]

assert (
collect_dedup_references_for_batch_backup_deletion(
layout=layout_mock(),
retained_backups_with_light_meta=retained_backups,
deleting_backups_with_light_meta=deleting_backups,
retained_backups=retained_backups,
deleting_backups=deleting_backups,
)
== result
)
Expand Down

0 comments on commit a977419

Please sign in to comment.