diff --git a/faust/stores/aerospike.py b/faust/stores/aerospike.py index f95de6d56..97d485875 100644 --- a/faust/stores/aerospike.py +++ b/faust/stores/aerospike.py @@ -262,3 +262,23 @@ def aerospike_fun_call_with_retry(self, fun, *args, **kwargs): ex ) # crash the app to prevent the offset from progressing raise ex + + async def backup_partition( + self, tp: Union[TP, int], flush: bool = True, purge: bool = False, keep: int = 1 + ) -> None: + """Backup partition from this store. + + Not yet implemented for Aerospike. + + """ + raise NotImplementedError("Not yet implemented for Aerospike.") + + def restore_backup( + self, tp: Union[TP, int], latest: bool = True, backup_id: int = 0 + ) -> None: + """Restore partition backup from this store. + + Not yet implemented for Aerospike. + + """ + raise NotImplementedError("Not yet implemented for Aerospike.") diff --git a/faust/stores/memory.py b/faust/stores/memory.py index d0aefc6b7..484517628 100644 --- a/faust/stores/memory.py +++ b/faust/stores/memory.py @@ -1,5 +1,5 @@ """In-memory table storage.""" -from typing import Any, Callable, Iterable, MutableMapping, Optional, Set, Tuple +from typing import Any, Callable, Iterable, MutableMapping, Optional, Set, Tuple, Union from faust.types import TP, EventT from faust.types.stores import KT, VT @@ -82,3 +82,23 @@ def reset_state(self) -> None: """ ... + + async def backup_partition( + self, tp: Union[TP, int], flush: bool = True, purge: bool = False, keep: int = 1 + ) -> None: + """Backup partition from this store. + + This does nothing when using the in-memory store. + + """ + ... + + def restore_backup( + self, tp: Union[TP, int], latest: bool = True, backup_id: int = 0 + ) -> None: + """Restore partition backup from this store. + + This does nothing when using the in-memory store. + + """ + ... diff --git a/faust/stores/rocksdb.py b/faust/stores/rocksdb.py index e7150d120..3181951b2 100644 --- a/faust/stores/rocksdb.py +++ b/faust/stores/rocksdb.py @@ -2,7 +2,9 @@ import asyncio import gc import math +import os import shutil +import tempfile import typing from collections import defaultdict from contextlib import suppress @@ -183,6 +185,83 @@ def __init__( self._key_index = LRUCache(limit=self.key_index_size) self.db_lock = asyncio.Lock() self.rebalance_ack = False + self._backup_path = os.path.join(self.path, f"{str(self.basename)}-backups") + try: + self._backup_engine = None + if not os.path.isdir(self._backup_path): + os.makedirs(self._backup_path, exist_ok=True) + testfile = tempfile.TemporaryFile(dir=self._backup_path) + testfile.close() + except PermissionError: + self.log.warning( + f'Unable to make directory for path "{self._backup_path}",' + f"disabling backups." + ) + except OSError: + self.log.warning( + f'Unable to create files in "{self._backup_path}",' f"disabling backups" + ) + else: + self._backup_engine = rocksdb.BackupEngine(self._backup_path) + + async def backup_partition( + self, tp: Union[TP, int], flush: bool = True, purge: bool = False, keep: int = 1 + ) -> None: + """Backup partition from this store. + + This will be saved in a separate directory in the data directory called + '{table-name}-backups'. + + Arguments: + tp: Partition to backup + flush: Flush the memset before backing up the state of the table. + purge: Purge old backups in the process + keep: How many backups to keep after purging + + This is only supported in newer versions of python-rocksdb which can read + the RocksDB database using multi-process read access. + See https://github.com/facebook/rocksdb/wiki/How-to-backup-RocksDB to know more. + """ + if self._backup_engine: + partition = tp + if isinstance(tp, TP): + partition = tp.partition + try: + if flush: + db = await self._try_open_db_for_partition(partition) + else: + db = self.rocksdb_options.open( + self.partition_path(partition), read_only=True + ) + self._backup_engine.create_backup(db, flush_before_backup=flush) + if purge: + self._backup_engine.purge_old_backups(keep) + except Exception: + self.log.info(f"Unable to backup partition {partition}.") + + def restore_backup( + self, tp: Union[TP, int], latest: bool = True, backup_id: int = 0 + ) -> None: + """Restore partition backup from this store. + + Arguments: + tp: Partition to restore + latest: Restore the latest backup, set as False to restore a specific ID + backup_id: Backup to restore + + """ + if self._backup_engine: + partition = tp + if isinstance(tp, TP): + partition = tp.partition + if latest: + self._backup_engine.restore_latest_backup( + str(self.partition_path(partition)), self._backup_path + ) + else: + self._backup_engine.restore_backup( + backup_id, str(self.partition_path(partition)), self._backup_path + ) def persisted_offset(self, tp: TP) -> Optional[int]: """Return the last persisted offset. diff --git a/faust/tables/objects.py b/faust/tables/objects.py index d52869397..33b2ecfa8 100644 --- a/faust/tables/objects.py +++ b/faust/tables/objects.py @@ -183,3 +183,16 @@ def apply_changelog_batch( for tp, offset in tp_offsets.items(): self.set_persisted_offset(tp, offset) + + async def backup_partition( + self, tp, flush: bool = True, purge: bool = False, keep: int = 1 + ) -> None: + raise NotImplementedError + + def restore_backup( + self, + tp, + latest: bool = True, + backup_id: int = 0, + ) -> None: + raise NotImplementedError diff --git a/faust/types/stores.py b/faust/types/stores.py index 238c8cee5..98c963001 100644 --- a/faust/types/stores.py +++ b/faust/types/stores.py @@ -101,3 +101,18 @@ async def on_recovery_completed( self, active_tps: Set[TP], standby_tps: Set[TP] ) -> None: ... + + @abc.abstractmethod + async def backup_partition( + self, tp: Union[TP, int], flush: bool = True, purge: bool = False, keep: int = 1 + ) -> None: + ... + + @abc.abstractmethod + def restore_backup( + self, + tp: Union[TP, int], + latest: bool = True, + backup_id: int = 0, + ) -> None: + ... diff --git a/tests/unit/stores/test_base.py b/tests/unit/stores/test_base.py index 8d2c5411c..354e84f64 100644 --- a/tests/unit/stores/test_base.py +++ b/tests/unit/stores/test_base.py @@ -29,6 +29,19 @@ def apply_changelog_batch(self, *args, **kwargs): def reset_state(self): ... + async def backup_partition( + self, tp, flush: bool = True, purge: bool = False, keep: int = 1 + ) -> None: + ... + + def restore_backup( + self, + tp, + latest: bool = True, + backup_id: int = 0, + ) -> None: + ... + class Test_Store: @pytest.fixture @@ -120,6 +133,19 @@ def _clear(self): def reset_state(self): ... + async def backup_partition( + self, tp, flush: bool = True, purge: bool = False, keep: int = 1 + ) -> None: + ... + + def restore_backup( + self, + tp, + latest: bool = True, + backup_id: int = 0, + ) -> None: + ... + class Test_SerializedStore: @pytest.fixture