Skip to content

Commit

Permalink
Merge pull request #7024 from kozlovsky/fix/load_checkpoints_in_backg…
Browse files Browse the repository at this point in the history
…round

Load checkpoints in background
  • Loading branch information
kozlovsky authored Sep 8, 2022
2 parents 62b4a8c + d588785 commit 10c7154
Show file tree
Hide file tree
Showing 9 changed files with 115 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
HTTPS_SCHEME,
HTTP_SCHEME,
MAGNET_SCHEME,
path_to_url,
scheme_from_url,
url_to_path,
)
Expand Down Expand Up @@ -66,12 +67,14 @@ def encode_atp(atp):


class DownloadManager(TaskManager):
START_TASK = "start"

def __init__(self,
state_dir,
notifier: Notifier,
peer_mid: bytes,
config: LibtorrentSettings = None,
gui_test_mode: bool = False,
download_defaults: DownloadDefaultsSettings = None,
bootstrap_infohash=None,
socks_listen_ports: Optional[List[int]] = None,
Expand All @@ -92,6 +95,7 @@ def __init__(self,
self.notifier = notifier
self.peer_mid = peer_mid
self.config = config or LibtorrentSettings()
self.gui_test_mode = gui_test_mode
self.bootstrap_infohash = bootstrap_infohash
self.download_defaults = download_defaults or DownloadDefaultsSettings()
self._libtorrent_port = None
Expand All @@ -101,6 +105,10 @@ def __init__(self,

self.downloads = {}

self.checkpoints_count = None
self.checkpoints_loaded = 0
self.all_checkpoints_are_loaded = False

self.metadata_tmpdir = None
# Dictionary that maps infohashes to download instances. These include only downloads that have
# been made specifically for fetching metainfo, and will be removed afterwards.
Expand Down Expand Up @@ -158,10 +166,23 @@ def initialize(self):

self.set_download_states_callback(self.sesscb_states_callback)

def start(self):
self.register_task(self.START_TASK, self._start)

async def _start(self):
await self.load_checkpoints()

if self.gui_test_mode:
from tribler.core.tests.tools.common import TORRENT_WITH_DIRS # pylint: disable=import-outside-toplevel
uri = path_to_url(TORRENT_WITH_DIRS)
await self.start_download_from_uri(uri)

def notify_shutdown_state(self, state):
self.notifier[notifications.tribler_shutdown_state](state)

async def shutdown(self, timeout=30):
self.cancel_pending_task(self.START_TASK)
self.cancel_pending_task("download_states_lc")
if self.downloads:
self.notify_shutdown_state("Checkpointing Downloads...")
await gather(*[download.stop() for download in self.downloads.values()], return_exceptions=True)
Expand Down Expand Up @@ -782,9 +803,6 @@ def set_download_states_callback(self, user_callback, interval=1.0):
self._logger.debug("Starting the download state callback with interval %f", interval)
self.replace_task("download_states_lc", self._invoke_states_cb, user_callback, interval=interval)

def stop_download_states_callback(self):
return self.cancel_pending_task("download_states_lc")

async def _invoke_states_cb(self, callback):
"""
Invoke the download states callback with a list of the download states.
Expand Down Expand Up @@ -825,9 +843,15 @@ def get_last_download_states(self):
return self._last_states_list

async def load_checkpoints(self):
for filename in self.get_checkpoint_dir().glob('*.conf'):
self._logger.info("Load checkpoints...")
checkpoint_filenames = list(self.get_checkpoint_dir().glob('*.conf'))
self.checkpoints_count = len(checkpoint_filenames)
for i, filename in enumerate(checkpoint_filenames, start=1):
self.load_checkpoint(filename)
self.checkpoints_loaded = i
await sleep(.01)
self.all_checkpoints_are_loaded = True
self._logger.info("Checkpoints are loaded")

def load_checkpoint(self, filename):
try:
Expand Down
11 changes: 3 additions & 8 deletions src/tribler/core/components/libtorrent/libtorrent_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from tribler.core.components.key.key_component import KeyComponent
from tribler.core.components.libtorrent.download_manager.download_manager import DownloadManager
from tribler.core.components.socks_servers.socks_servers_component import SocksServersComponent
from tribler.core.utilities.rest_utils import path_to_url


class LibtorrentComponent(Component):
Expand All @@ -21,6 +20,7 @@ async def run(self):

self.download_manager = DownloadManager(
config=config.libtorrent,
gui_test_mode=config.gui_test_mode,
state_dir=config.state_dir,
notifier=self.session.notifier,
peer_mid=key_component.primary_key.key_to_hash(),
Expand All @@ -30,15 +30,10 @@ async def run(self):
dummy_mode=config.gui_test_mode)
self.download_manager.initialize()

await self.download_manager.load_checkpoints()

if config.gui_test_mode:
from tribler.core.tests.tools.common import TORRENT_WITH_DIRS # pylint: disable=import-outside-toplevel
uri = path_to_url(TORRENT_WITH_DIRS)
await self.download_manager.start_download_from_uri(uri)
# load checkpoints in a background task to not delay initialization of dependent components (e.g. RESTComponent)
self.download_manager.start()

async def shutdown(self):
await super().shutdown()
if self.download_manager:
self.download_manager.stop_download_states_callback()
await self.download_manager.shutdown()
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@
from tribler.core.utilities.utilities import froze_it


TOTAL = 'total'
LOADED = 'loaded'
ALL_LOADED = 'all_loaded'


def _safe_extended_peer_info(ext_peer_info):
"""
Given a string describing peer info, return a json.dumps() safe representation.
Expand Down Expand Up @@ -84,7 +89,7 @@ class DownloadsEndpoint(RESTEndpoint):
starting, pausing and stopping downloads.
"""

def __init__(self, download_manager, metadata_store=None, tunnel_community=None):
def __init__(self, download_manager: DownloadManager, metadata_store=None, tunnel_community=None):
super().__init__()
self.download_manager = download_manager
self.mds = metadata_store
Expand Down Expand Up @@ -219,6 +224,11 @@ def get_files_info_json(download):
'vod_prebuffering_progress_consec': Float,
'error': String,
'time_added': Integer
}),
'checkpoints': schema(Checkpoints={
TOTAL: Integer,
LOADED: Integer,
ALL_LOADED: Boolean,
})
}),
}
Expand All @@ -237,6 +247,15 @@ async def get_downloads(self, request):
get_pieces = params.get('get_pieces', '0') == '1'
get_files = params.get('get_files', '0') == '1'

checkpoints = {
TOTAL: self.download_manager.checkpoints_count,
LOADED: self.download_manager.checkpoints_loaded,
ALL_LOADED: self.download_manager.all_checkpoints_are_loaded,
}

if not self.download_manager.all_checkpoints_are_loaded:
return RESTResponse({"downloads": [], "checkpoints": checkpoints})

downloads_json = []
downloads = self.download_manager.get_downloads()
for download in downloads:
Expand Down Expand Up @@ -332,7 +351,7 @@ async def get_downloads(self, request):
download_json["files"] = self.get_files_info_json(download)

downloads_json.append(download_json)
return RESTResponse({"downloads": downloads_json})
return RESTResponse({"downloads": downloads_json, "checkpoints": checkpoints})

@docs(
tags=["Libtorrent"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,22 +149,39 @@ def test_get_extended_status_circuits(mock_extended_status):
assert mock_extended_status == DLSTATUS_CIRCUITS


async def test_get_downloads_if_checkpoints_are_not_loaded(mock_dlmgr, rest_api):
mock_dlmgr.checkpoints_count = 10
mock_dlmgr.checkpoints_loaded = 5
mock_dlmgr.all_checkpoints_are_loaded = False

expected_json = {"downloads": [], "checkpoints": {"total": 10, "loaded": 5, "all_loaded": False}}
await do_request(rest_api, "downloads?get_peers=1&get_pieces=1", expected_code=200, expected_json=expected_json)


async def test_get_downloads_no_downloads(mock_dlmgr, rest_api):
"""
Testing whether the API returns an empty list when downloads are fetched but no downloads are active
"""
result = await do_request(rest_api, 'downloads?get_peers=1&get_pieces=1',
expected_code=200, expected_json={"downloads": []})
assert result["downloads"] == []
mock_dlmgr.checkpoints_count = 0
mock_dlmgr.checkpoints_loaded = 0
mock_dlmgr.all_checkpoints_are_loaded = True

expected_json = {"downloads": [], "checkpoints": {"total": 0, "loaded": 0, "all_loaded": True}}
await do_request(rest_api, "downloads?get_peers=1&get_pieces=1", expected_code=200, expected_json=expected_json)


async def test_get_downloads(mock_dlmgr, test_download, rest_api):
"""
Testing whether the API returns the right download when a download is added
"""
mock_dlmgr.get_downloads = lambda: [test_download]
mock_dlmgr.checkpoints_count = 1
mock_dlmgr.checkpoints_loaded = 1
mock_dlmgr.all_checkpoints_are_loaded = True

downloads = await do_request(rest_api, 'downloads?get_peers=1&get_pieces=1', expected_code=200)
assert len(downloads["downloads"]) == 1
assert downloads["checkpoints"] == {"total": 1, "loaded": 1, "all_loaded": True}


async def test_start_download_no_uri(rest_api):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
from asyncio import Future, gather, get_event_loop, sleep
from unittest.mock import MagicMock

Expand Down Expand Up @@ -381,6 +382,13 @@ def mock_start_download(*_, **__):
assert not good


@pytest.mark.asyncio
async def test_download_manager_start(fake_dlmgr):
fake_dlmgr.start()
await asyncio.sleep(0.01)
assert fake_dlmgr.all_checkpoints_are_loaded


def test_load_empty_checkpoint(fake_dlmgr, tmpdir):
"""
Test whether download resumes with faulty pstate file.
Expand Down Expand Up @@ -413,8 +421,16 @@ def mocked_load_checkpoint(filename):
state_file.write(b"hi")

fake_dlmgr.load_checkpoint = mocked_load_checkpoint
assert fake_dlmgr.all_checkpoints_are_loaded is False
assert fake_dlmgr.checkpoints_count is None
assert fake_dlmgr.checkpoints_loaded == 0

await fake_dlmgr.load_checkpoints()

assert mocked_load_checkpoint.called
assert fake_dlmgr.all_checkpoints_are_loaded is True
assert fake_dlmgr.checkpoints_count == 1
assert fake_dlmgr.checkpoints_loaded == 1


@pytest.mark.asyncio
Expand Down
3 changes: 3 additions & 0 deletions src/tribler/core/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ def mock_dlmgr(state_dir):
dlmgr.get_checkpoint_dir = lambda: checkpoints_dir
dlmgr.state_dir = state_dir
dlmgr.get_downloads = lambda: []
dlmgr.checkpoints_count = 1
dlmgr.checkpoints_loaded = 1
dlmgr.all_checkpoints_are_loaded = True
return dlmgr


Expand Down
2 changes: 1 addition & 1 deletion src/tribler/gui/event_request_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

received_events = []

CORE_CONNECTION_TIMEOUT = 60
CORE_CONNECTION_TIMEOUT = 120
RECONNECT_INTERVAL_MS = 100


Expand Down
8 changes: 2 additions & 6 deletions src/tribler/gui/tests/test_core_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,9 @@ def test_decode_raw_core_output(core_manager):


def test_format_error_message():
actual = CoreManager.format_error_message(exit_code=errno.ENOENT, exit_status=1, last_core_output='last\noutput')
actual = CoreManager.format_error_message(exit_code=errno.ENOENT, exit_status=1)
expected = '''The Tribler core has unexpectedly finished with exit code 2 and status: 1.
Error message: No such file or directory
Last core output:
> last
> output'''
Error message: No such file or directory'''

assert actual == expected
25 changes: 21 additions & 4 deletions src/tribler/gui/widgets/downloadspage.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import logging
import os
import time
from typing import Optional

from PyQt5.QtCore import QTimer, QUrl, Qt, pyqtSignal
from PyQt5.QtGui import QDesktopServices
Expand Down Expand Up @@ -53,6 +55,7 @@ class DownloadsPage(AddBreadcrumbOnShowMixin, QWidget):

def __init__(self):
QWidget.__init__(self)
self._logger = logging.getLogger(self.__class__.__name__)
self.export_dir = None
self.filter = DOWNLOADS_FILTER_ALL
self.download_widgets = {} # key: infohash, value: QTreeWidgetItem
Expand All @@ -62,7 +65,8 @@ def __init__(self):
self.downloads_last_update = 0
self.selected_items = []
self.dialog = None
self.loading_message_widget = None
self.loading_message_widget: Optional[LoadingDownloadWidgetItem] = None
self.loading_list_item: Optional[LoadingListItem] = None
self.total_download = 0
self.total_upload = 0

Expand Down Expand Up @@ -109,10 +113,9 @@ def on_filter_text_changed(self, text):
def start_loading_downloads(self):
self.window().downloads_list.setSelectionMode(QAbstractItemView.NoSelection)
self.loading_message_widget = LoadingDownloadWidgetItem()
self.loading_list_item = LoadingListItem(self.window().downloads_list)
self.window().downloads_list.addTopLevelItem(self.loading_message_widget)
self.window().downloads_list.setItemWidget(
self.loading_message_widget, 2, LoadingListItem(self.window().downloads_list)
)
self.window().downloads_list.setItemWidget(self.loading_message_widget, 2, self.loading_list_item)
self.schedule_downloads_timer(now=True)

def schedule_downloads_timer(self, now=False):
Expand Down Expand Up @@ -148,6 +151,20 @@ def load_downloads(self):
def on_received_downloads(self, downloads):
if not downloads or "downloads" not in downloads:
return # This might happen when closing Tribler

checkpoints = downloads.get('checkpoints', {})
if checkpoints and self.loading_message_widget:
# If not all checkpoints are loaded, display the number of the loaded checkpoints
total = checkpoints['total']
loaded = checkpoints['loaded']
if not checkpoints['all_loaded']:
# The column is too narrow for a long message, probably we should redesign this UI element later
message = f'{loaded}/{total} checkpoints'
self._logger.info(f'Loading checkpoints: {message}')
self.loading_list_item.textlabel.setText(message)
self.schedule_downloads_timer()
return

loading_widget_index = self.window().downloads_list.indexOfTopLevelItem(self.loading_message_widget)
if loading_widget_index > -1:
self.window().downloads_list.takeTopLevelItem(loading_widget_index)
Expand Down

0 comments on commit 10c7154

Please sign in to comment.