Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle the "DatabaseError: database disk image is malformed" error #7628

Merged
merged 24 commits into from
Nov 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
9b8f22a
Make a comment a bit clearer by placing it above the described field
kozlovsky Sep 27, 2023
7299b1d
Add comments to inherited methods acquire_lock and release_lock
kozlovsky Sep 27, 2023
a6249c7
Renaming: PatchedSQLiteProvider -> TriblerSQLiteProvider
kozlovsky Sep 27, 2023
ff6fa57
Rearrange methods of TriblerDbSession to improve the readability: put…
kozlovsky Sep 27, 2023
277281e
Refactoring: pass TriblerSQLiteProvider as a bind argument instead of…
kozlovsky Sep 27, 2023
cd71c18
Handle corrupted databases
kozlovsky Sep 27, 2023
4da2ac4
Display the message to the user instead of sending the report to Sentry
kozlovsky Oct 13, 2023
c6f88ad
Fix checking the database version in migrations
kozlovsky Oct 13, 2023
044011c
Move get_db_version to pony_utils; add tests for get_db_version
kozlovsky Oct 16, 2023
93d4ef6
Do not require Tribler restart when the error happens during the upgrade
kozlovsky Oct 18, 2023
377e46a
Add comments to tests
kozlovsky Oct 19, 2023
c8d242d
Refactoring: simplify get_db_version logic
kozlovsky Nov 15, 2023
1c4456a
Add a comment to TriblerPool
kozlovsky Nov 15, 2023
bf584fb
Fix and unify the error messages when the database is corrupted
kozlovsky Nov 15, 2023
4f9b017
Fix handling corrupted database file before create_db flag is set
kozlovsky Nov 16, 2023
a2b889c
Detect database corruption when initial database connection is created
kozlovsky Nov 16, 2023
4276c8e
Immediately stop core process with the exit code 99 if the database c…
kozlovsky Nov 16, 2023
8a134c8
CoreManager restarts Core in case of database corruption; use unified…
kozlovsky Nov 16, 2023
51b3d7b
Satisfy linter
kozlovsky Nov 16, 2023
8d5b64d
Add exit_codes.py
kozlovsky Nov 16, 2023
a859041
Fix process_compressed_mdblob_threaded
kozlovsky Nov 16, 2023
8924ca4
Add a comment and tests to @catch_db_is_corrupted_exception decorator
kozlovsky Nov 17, 2023
e6d2c58
Handle database corruption exception at the SQLite connection level
kozlovsky Nov 20, 2023
5c52cb1
Fix Tribler UI starting when Core re-connects
kozlovsky Nov 22, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from tribler.core.components.bandwidth_accounting.db import history, misc, transaction as db_transaction
from tribler.core.components.bandwidth_accounting.db.transaction import BandwidthTransactionData
from tribler.core.utilities.db_corruption_handling.base import handle_db_if_corrupted
from tribler.core.utilities.pony_utils import TriblerDatabase
from tribler.core.utilities.utilities import MEMORY_DB

Expand Down Expand Up @@ -34,7 +35,7 @@ def __init__(self, db_path: Union[Path, type(MEMORY_DB)], my_pub_key: bytes,
# with the static analysis.
# pylint: disable=unused-variable

@self.database.on_connect(provider='sqlite')
@self.database.on_connect
def sqlite_sync_pragmas(_, connection):
cursor = connection.cursor()
cursor.execute("PRAGMA journal_mode = WAL")
Expand All @@ -50,6 +51,8 @@ def sqlite_sync_pragmas(_, connection):
create_db = True
db_path_string = ":memory:"
else:
# We need to handle the database corruption case before determining the state of the create_db flag.
handle_db_if_corrupted(db_path)
create_db = not db_path.is_file()
db_path_string = str(db_path)

Expand Down
21 changes: 21 additions & 0 deletions src/tribler/core/components/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
from tribler.core.components.exceptions import ComponentStartupException, MissedDependency, NoneComponent
from tribler.core.components.reporter.exception_handler import default_core_exception_handler
from tribler.core.sentry_reporter.sentry_reporter import SentryReporter
from tribler.core.utilities.db_corruption_handling.base import DatabaseIsCorrupted
from tribler.core.utilities.exit_codes import EXITCODE_DATABASE_IS_CORRUPTED
from tribler.core.utilities.process_manager import get_global_process_manager

if TYPE_CHECKING:
from tribler.core.components.session import Session, T
Expand Down Expand Up @@ -47,8 +50,26 @@ async def start(self):
self._set_component_status(msg, logging.ERROR, exc_info=exc_info)
self.failed = True
self.started_event.set()

if isinstance(e, DatabaseIsCorrupted):
# When the database corruption is detected, we should stop the process immediately.
# Tribler GUI will restart the process and the database will be recreated.

# Usually we wrap an exception into ComponentStartupException, and allow
# CoreExceptionHandler.unhandled_error_observer to handle it after all components are started,
# but in this case we don't do it. The reason is that handling ComponentStartupException
# starts the shutting down of Tribler, and due to some obscure reasons it is not possible to
# raise any exception, even SystemExit, from CoreExceptionHandler.unhandled_error_observer when
# Tribler is shutting down. It looks like in this case unhandled_error_observer is called from
# Task.__del__ method and all exceptions that are raised from __del__ are ignored.
# See https://bugs.python.org/issue25489 for similar case.
process_manager = get_global_process_manager()
process_manager.sys_exit(EXITCODE_DATABASE_IS_CORRUPTED, e)
drew2a marked this conversation as resolved.
Show resolved Hide resolved
return # Added for clarity; actually, the code raised SystemExit on the previous line

if self.session.failfast:
raise e

self.session.set_startup_exception(ComponentStartupException(self, e))
self.started_event.set()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from tribler.core.components.metadata_store.db.orm_bindings.channel_node import COMMITTED
from tribler.core.components.metadata_store.db.serialization import CHANNEL_TORRENT
from tribler.core.components.metadata_store.db.store import MetadataStore
from tribler.core.utilities.db_corruption_handling.base import DatabaseIsCorrupted
from tribler.core.utilities.notifier import Notifier
from tribler.core.utilities.pony_utils import run_threaded
from tribler.core.utilities.simpledefs import DownloadStatus
Expand Down Expand Up @@ -80,6 +81,8 @@
channel.id_,
channel_download,
)
except DatabaseIsCorrupted:
raise # re-raise this exception and terminate the Core process

Check warning on line 85 in src/tribler/core/components/gigachannel_manager/gigachannel_manager.py

View check run for this annotation

Codecov / codecov/patch

src/tribler/core/components/gigachannel_manager/gigachannel_manager.py#L84-L85

Added lines #L84 - L85 were not covered by tests
except Exception:
self._logger.exception("Error when tried to resume personal channel seeding on GigaChannel Manager startup")

Expand Down
2 changes: 1 addition & 1 deletion src/tribler/core/components/knowledge/db/knowledge_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class KnowledgeDatabase:
def __init__(self, filename: Optional[str] = None, *, create_tables: bool = True, **generate_mapping_kwargs):
self.instance = TriblerDatabase()
self.define_binding(self.instance)
self.instance.bind('sqlite', filename or ':memory:', create_db=True)
self.instance.bind(provider='sqlite', filename=filename or ':memory:', create_db=True)
generate_mapping_kwargs['create_tables'] = create_tables
self.instance.generate_mapping(**generate_mapping_kwargs)
self.logger = logging.getLogger(self.__class__.__name__)
Expand Down
12 changes: 9 additions & 3 deletions src/tribler/core/components/metadata_store/db/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
from tribler.core.components.metadata_store.remote_query_community.payload_checker import process_payload
from tribler.core.components.torrent_checker.torrent_checker.dataclasses import HealthInfo
from tribler.core.exceptions import InvalidSignatureException
from tribler.core.utilities.db_corruption_handling.base import DatabaseIsCorrupted, handle_db_if_corrupted
from tribler.core.utilities.notifier import Notifier
from tribler.core.utilities.path_util import Path
from tribler.core.utilities.pony_utils import TriblerDatabase, get_max, get_or_create, run_threaded
Expand Down Expand Up @@ -166,7 +167,7 @@
# This attribute is internally called by Pony on startup, though pylint cannot detect it
# with the static analysis.
# pylint: disable=unused-variable
@self.db.on_connect(provider='sqlite')
@self.db.on_connect
def on_connect(_, connection):
cursor = connection.cursor()
cursor.execute("PRAGMA journal_mode = WAL")
Expand Down Expand Up @@ -218,6 +219,8 @@
create_db = True
db_path_string = ":memory:"
else:
# We need to handle the database corruption case before determining the state of the create_db flag.
handle_db_if_corrupted(db_filename)
create_db = not db_filename.is_file()
db_path_string = str(db_filename)

Expand Down Expand Up @@ -450,9 +453,12 @@
async def process_compressed_mdblob_threaded(self, compressed_data, **kwargs):
try:
return await run_threaded(self.db, self.process_compressed_mdblob, compressed_data, **kwargs)
except DatabaseIsCorrupted:
raise # re-raise this exception and terminate the Core process

Check warning on line 457 in src/tribler/core/components/metadata_store/db/store.py

View check run for this annotation

Codecov / codecov/patch

src/tribler/core/components/metadata_store/db/store.py#L456-L457

Added lines #L456 - L457 were not covered by tests
except Exception as e: # pylint: disable=broad-except # pragma: no cover
self._logger.warning("DB transaction error when tried to process compressed mdblob: %s", str(e))
return None
self._logger.exception("DB transaction error when tried to process compressed mdblob: "
f"{e.__class__.__name__}: {e}", exc_info=e)
return []

def process_compressed_mdblob(self, compressed_data, **kwargs):
try:
Expand Down
14 changes: 11 additions & 3 deletions src/tribler/core/components/reporter/exception_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
from tribler.core.components.exceptions import ComponentStartupException
from tribler.core.components.reporter.reported_error import ReportedError
from tribler.core.sentry_reporter.sentry_reporter import SentryReporter
from tribler.core.utilities.db_corruption_handling.base import DatabaseIsCorrupted
from tribler.core.utilities.exit_codes import EXITCODE_DATABASE_IS_CORRUPTED
from tribler.core.utilities.process_manager import get_global_process_manager

# There are some errors that we are ignoring.
Expand Down Expand Up @@ -93,12 +95,18 @@ def unhandled_error_observer(self, _, context):
should_stop = context.pop('should_stop', True)
message = context.pop('message', 'no message')
exception = context.pop('exception', None) or self._create_exception_from(message)
# Exception
text = str(exception)

self.logger.exception(f'{exception.__class__.__name__}: {exception}', exc_info=exception)
drew2a marked this conversation as resolved.
Show resolved Hide resolved

if isinstance(exception, DatabaseIsCorrupted):
process_manager.sys_exit(EXITCODE_DATABASE_IS_CORRUPTED, exception)
drew2a marked this conversation as resolved.
Show resolved Hide resolved
return # Added for clarity; actually, the code raised SystemExit on the previous line

if isinstance(exception, ComponentStartupException):
self.logger.info('The exception is ComponentStartupException')
should_stop = exception.component.tribler_should_stop_on_component_error
exception = exception.__cause__

if isinstance(exception, NoCrashException):
self.logger.info('The exception is NoCrashException')
should_stop = False
Expand All @@ -113,7 +121,7 @@ def unhandled_error_observer(self, _, context):

reported_error = ReportedError(
type=exception.__class__.__name__,
text=text,
text=str(exception),
long_text=long_text,
context=str(context),
event=self.sentry_reporter.event_from_exception(exception) or {},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from tribler.core.components.reporter.exception_handler import CoreExceptionHandler
from tribler.core.sentry_reporter import sentry_reporter
from tribler.core.sentry_reporter.sentry_reporter import SentryReporter
from tribler.core.utilities.db_corruption_handling.base import DatabaseIsCorrupted


# pylint: disable=protected-access, redefined-outer-name
Expand Down Expand Up @@ -85,6 +86,17 @@ def test_unhandled_error_observer_exception(exception_handler):
assert reported_error.should_stop


@patch('tribler.core.components.reporter.exception_handler.get_global_process_manager')
def test_unhandled_error_observer_database_corrupted(get_global_process_manager, exception_handler):
# test that database corruption exception reported to the GUI
exception = DatabaseIsCorrupted('db_path_string')
exception_handler.report_callback = MagicMock()
exception_handler.unhandled_error_observer(None, {'exception': exception})

get_global_process_manager().sys_exit.assert_called_once_with(99, exception)
exception_handler.report_callback.assert_not_called()


def test_unhandled_error_observer_only_message(exception_handler):
# test that unhandled exception, represented by message, reported to the GUI
context = {'message': 'Any'}
Expand Down
2 changes: 0 additions & 2 deletions src/tribler/core/components/restapi/rest/rest_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,7 @@ async def error_middleware(request, handler):
'message': f'Request size is larger than {MAX_REQUEST_SIZE} bytes'
}}, status=HTTP_REQUEST_ENTITY_TOO_LARGE)
except Exception as e:
logger.exception(e)
full_exception = traceback.format_exc()

default_core_exception_handler.unhandled_error_observer(None, {'exception': e, 'should_stop': False})

return RESTResponse({"error": {
Expand Down
17 changes: 17 additions & 0 deletions src/tribler/core/components/tests/test_base_component.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from unittest.mock import patch

import pytest

from tribler.core.components.component import Component
from tribler.core.components.exceptions import MissedDependency, MultipleComponentsFound, NoneComponent
from tribler.core.components.session import Session
from tribler.core.config.tribler_config import TriblerConfig
from tribler.core.utilities.db_corruption_handling.base import DatabaseIsCorrupted


class ComponentTestException(Exception):
Expand Down Expand Up @@ -46,6 +49,20 @@ class TestComponentB(TestComponent):
assert component.stopped


@patch('tribler.core.components.component.get_global_process_manager')
async def test_session_start_database_corruption_detected(get_global_process_manager):
exception = DatabaseIsCorrupted('db_path_string')

class TestComponent(Component):
async def run(self):
raise exception

component = TestComponent()

await component.start()
get_global_process_manager().sys_exit.assert_called_once_with(99, exception)


class ComponentA(Component):
pass

Expand Down
54 changes: 23 additions & 31 deletions src/tribler/core/upgrade/db8_to_db10.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import contextlib
import datetime
import logging
import sqlite3
from collections import deque
from time import time as now

from pony.orm import db_session

from tribler.core.components.metadata_store.db.store import MetadataStore
from tribler.core.utilities.db_corruption_handling import sqlite_replacement

TABLE_NAMES = (
"ChannelNode", "TorrentState", "TorrentState_TrackerState", "ChannelPeer", "ChannelVote", "TrackerState", "Vsids")
Expand Down Expand Up @@ -126,31 +126,31 @@ def convert_command(offset, batch_size):
def do_migration(self):
result = None # estimated duration in seconds of ChannelNode table copying time
try:

old_table_columns = {}
for table_name in TABLE_NAMES:
old_table_columns[table_name] = get_table_columns(self.old_db_path, table_name)

with contextlib.closing(sqlite3.connect(self.new_db_path)) as connection, connection:
cursor = connection.cursor()
cursor.execute("PRAGMA journal_mode = OFF;")
cursor.execute("PRAGMA synchronous = OFF;")
cursor.execute("PRAGMA foreign_keys = OFF;")
cursor.execute("PRAGMA temp_store = MEMORY;")
cursor.execute("PRAGMA cache_size = -204800;")
cursor.execute(f'ATTACH DATABASE "{self.old_db_path}" as old_db;')

for table_name in TABLE_NAMES:
t1 = now()
cursor.execute("BEGIN TRANSACTION;")
if not self.must_shutdown():
self.convert_table(cursor, table_name, old_table_columns[table_name])
cursor.execute("COMMIT;")
duration = now() - t1
self._logger.info(f"Upgrade: copied table {table_name} in {duration:.2f} seconds")

if table_name == 'ChannelNode':
result = duration
with contextlib.closing(sqlite_replacement.connect(self.new_db_path)) as connection:
with connection:
cursor = connection.cursor()
cursor.execute("PRAGMA journal_mode = OFF;")
cursor.execute("PRAGMA synchronous = OFF;")
cursor.execute("PRAGMA foreign_keys = OFF;")
cursor.execute("PRAGMA temp_store = MEMORY;")
cursor.execute("PRAGMA cache_size = -204800;")
cursor.execute(f'ATTACH DATABASE "{self.old_db_path}" as old_db;')

for table_name in TABLE_NAMES:
t1 = now()
cursor.execute("BEGIN TRANSACTION;")
if not self.must_shutdown():
self.convert_table(cursor, table_name, old_table_columns[table_name])
cursor.execute("COMMIT;")
duration = now() - t1
self._logger.info(f"Upgrade: copied table {table_name} in {duration:.2f} seconds")

if table_name == 'ChannelNode':
result = duration

self.update_status("Synchronizing the upgraded DB to disk, please wait.")
except Exception as e:
Expand Down Expand Up @@ -234,16 +234,8 @@ def calc_progress(duration_now, duration_half=60.0):


def get_table_columns(db_path, table_name):
with contextlib.closing(sqlite3.connect(db_path)) as connection, connection:
with contextlib.closing(sqlite_replacement.connect(db_path)) as connection, connection:
cursor = connection.cursor()
cursor.execute(f'SELECT * FROM {table_name} LIMIT 1')
names = [description[0] for description in cursor.description]
return names


def get_db_version(db_path):
with contextlib.closing(sqlite3.connect(db_path)) as connection, connection:
cursor = connection.cursor()
cursor.execute('SELECT value FROM MiscData WHERE name == "db_version"')
version = int(cursor.fetchone()[0])
return version
2 changes: 1 addition & 1 deletion src/tribler/core/upgrade/tags_to_knowledge/tags_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class TagDatabase:
def __init__(self, filename: Optional[str] = None, *, create_tables: bool = True, **generate_mapping_kwargs):
self.instance = TriblerDatabase()
self.define_binding(self.instance)
self.instance.bind('sqlite', filename or ':memory:', create_db=True)
self.instance.bind(provider='sqlite', filename=filename or ':memory:', create_db=True)
generate_mapping_kwargs['create_tables'] = create_tables
self.instance.generate_mapping(**generate_mapping_kwargs)

Expand Down
35 changes: 33 additions & 2 deletions src/tribler/core/upgrade/tests/test_upgrader.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import time
from pathlib import Path
from typing import Set
from unittest.mock import patch
from unittest.mock import Mock, patch

import pytest
from ipv8.keyvault.private.libnaclkey import LibNaCLSK
Expand All @@ -15,8 +15,10 @@
from tribler.core.tests.tools.common import TESTS_DATA_DIR
from tribler.core.upgrade.db8_to_db10 import calc_progress
from tribler.core.upgrade.tags_to_knowledge.tags_db import TagDatabase
from tribler.core.upgrade.upgrade import TriblerUpgrader, cleanup_noncompliant_channel_torrents
from tribler.core.upgrade.upgrade import TriblerUpgrader, catch_db_is_corrupted_exception, \
cleanup_noncompliant_channel_torrents
from tribler.core.utilities.configparser import CallbackConfigParser
from tribler.core.utilities.db_corruption_handling.base import DatabaseIsCorrupted
from tribler.core.utilities.utilities import random_infohash


Expand Down Expand Up @@ -55,6 +57,35 @@ def _copy(source_name, target):
shutil.copyfile(source, target)


def test_catch_db_is_corrupted_exception_with_exception():
upgrader = Mock(_db_is_corrupted_exception=None)
upgrader_method = Mock(side_effect=DatabaseIsCorrupted())
decorated_method = catch_db_is_corrupted_exception(upgrader_method)

# Call the decorated method and expect it to catch the exception
decorated_method(upgrader)
upgrader_method.assert_called_once()

# Check if the exception was caught and stored
upgrader_method.assert_called_once()
assert isinstance(upgrader._db_is_corrupted_exception, DatabaseIsCorrupted)
upgrader._logger.exception.assert_called_once()


def test_catch_db_is_corrupted_exception_without_exception():
upgrader = Mock(_db_is_corrupted_exception=None)
upgrader_method = Mock()
decorated_method = catch_db_is_corrupted_exception(upgrader_method)

# Call the decorated method and expect it to run without exceptions
decorated_method(upgrader)

# Check if the method was called and no exception was stored
upgrader_method.assert_called_once()
assert upgrader._db_is_corrupted_exception is None
upgrader._logger.exception.assert_not_called()


def test_upgrade_pony_db_complete(upgrader, channels_dir, state_dir, trustchain_keypair,
mds_path): # pylint: disable=W0621
"""
Expand Down
Loading
Loading