diff --git a/src/middlewared/middlewared/plugins/cloud_backup/snapshot.py b/src/middlewared/middlewared/plugins/cloud_backup/snapshot.py index 2cf699a88043d..e0d7c3d169ea3 100644 --- a/src/middlewared/middlewared/plugins/cloud_backup/snapshot.py +++ b/src/middlewared/middlewared/plugins/cloud_backup/snapshot.py @@ -3,8 +3,8 @@ import subprocess from middlewared.plugins.cloud_backup.restic import get_restic_config -from middlewared.schema import accepts, Bool, Datetime, Dict, Int, List, returns, Str -from middlewared.service import CallError, Service +from middlewared.schema import accepts, Datetime, Dict, Int, List, returns, Str +from middlewared.service import CallError, job, Service from middlewared.validators import NotMatch @@ -99,3 +99,27 @@ def list_snapshot_directory(self, id_, snapshot_id, path): contents.append(item) return contents + + @accepts(Int("id"), Str("snapshot_id", validators=[NotMatch(r"^-")])) + @returns() + @job(lock=lambda args: "cloud_backup:{}".format(args[-1]), lock_queue_size=1) + def delete_snapshot(self, job, id_, snapshot_id): + """ + Delete snapshot `snapshot_id` created by the cloud backup job `id`. + """ + self.middleware.call_sync("network.general.will_perform_activity", "cloud_backup") + + cloud_backup = self.middleware.call_sync("cloud_backup.get_instance", id_) + + restic_config = get_restic_config(cloud_backup) + + try: + subprocess.run( + restic_config.cmd + ["forget", snapshot_id, "--prune"], + env=restic_config.env, + capture_output=True, + text=True, + check=True, + ) + except subprocess.CalledProcessError as e: + raise CallError(e.stderr) diff --git a/tests/api2/test_cloud_backup.py b/tests/api2/test_cloud_backup.py index c7daebddd224b..b35e159562bd0 100644 --- a/tests/api2/test_cloud_backup.py +++ b/tests/api2/test_cloud_backup.py @@ -109,6 +109,12 @@ def test_cloud_backup(cloud_backup_task): snapshots = call("cloud_backup.list_snapshots", cloud_backup_task.task["id"]) assert all(snapshot["id"] != first_snapshot["id"] for snapshot in snapshots) + snapshot_to_delete = snapshots[0] + call("cloud_backup.delete_snapshot", cloud_backup_task.task["id"], snapshot_to_delete["id"], job=True) + + snapshots = call("cloud_backup.list_snapshots", cloud_backup_task.task["id"]) + assert all(snapshot["id"] != snapshot_to_delete["id"] for snapshot in snapshots) + @pytest.fixture(scope="module") def completed_cloud_backup_task(s3_credential):