Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Load checkpoints in background #7024

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
HTTPS_SCHEME,
HTTP_SCHEME,
MAGNET_SCHEME,
path_to_url,
scheme_from_url,
url_to_path,
)
Expand Down Expand Up @@ -66,12 +67,14 @@ def encode_atp(atp):


class DownloadManager(TaskManager):
START_TASK = "start"

def __init__(self,
state_dir,
notifier: Notifier,
peer_mid: bytes,
config: LibtorrentSettings = None,
gui_test_mode: bool = False,
download_defaults: DownloadDefaultsSettings = None,
bootstrap_infohash=None,
socks_listen_ports: Optional[List[int]] = None,
Expand All @@ -92,6 +95,7 @@ def __init__(self,
self.notifier = notifier
self.peer_mid = peer_mid
self.config = config or LibtorrentSettings()
self.gui_test_mode = gui_test_mode
self.bootstrap_infohash = bootstrap_infohash
self.download_defaults = download_defaults or DownloadDefaultsSettings()
self._libtorrent_port = None
Expand All @@ -101,6 +105,10 @@ def __init__(self,

self.downloads = {}

self.checkpoints_count = None
self.checkpoints_loaded = 0
self.all_checkpoints_are_loaded = False

self.metadata_tmpdir = None
# Dictionary that maps infohashes to download instances. These include only downloads that have
# been made specifically for fetching metainfo, and will be removed afterwards.
Expand Down Expand Up @@ -158,10 +166,23 @@ def initialize(self):

self.set_download_states_callback(self.sesscb_states_callback)

def start(self):
self.register_task(self.START_TASK, self._start)

async def _start(self):
await self.load_checkpoints()

if self.gui_test_mode:
from tribler.core.tests.tools.common import TORRENT_WITH_DIRS # pylint: disable=import-outside-toplevel
uri = path_to_url(TORRENT_WITH_DIRS)
await self.start_download_from_uri(uri)

def notify_shutdown_state(self, state):
self.notifier[notifications.tribler_shutdown_state](state)

async def shutdown(self, timeout=30):
self.cancel_pending_task(self.START_TASK)
self.cancel_pending_task("download_states_lc")
if self.downloads:
self.notify_shutdown_state("Checkpointing Downloads...")
await gather(*[download.stop() for download in self.downloads.values()], return_exceptions=True)
Expand Down Expand Up @@ -782,9 +803,6 @@ def set_download_states_callback(self, user_callback, interval=1.0):
self._logger.debug("Starting the download state callback with interval %f", interval)
self.replace_task("download_states_lc", self._invoke_states_cb, user_callback, interval=interval)

def stop_download_states_callback(self):
return self.cancel_pending_task("download_states_lc")

async def _invoke_states_cb(self, callback):
"""
Invoke the download states callback with a list of the download states.
Expand Down Expand Up @@ -825,9 +843,15 @@ def get_last_download_states(self):
return self._last_states_list

async def load_checkpoints(self):
for filename in self.get_checkpoint_dir().glob('*.conf'):
self._logger.info("Load checkpoints...")
checkpoint_filenames = list(self.get_checkpoint_dir().glob('*.conf'))
self.checkpoints_count = len(checkpoint_filenames)
for i, filename in enumerate(checkpoint_filenames, start=1):
self.load_checkpoint(filename)
self.checkpoints_loaded = i
await sleep(.01)
self.all_checkpoints_are_loaded = True
self._logger.info("Checkpoints are loaded")

def load_checkpoint(self, filename):
try:
Expand Down
11 changes: 3 additions & 8 deletions src/tribler/core/components/libtorrent/libtorrent_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from tribler.core.components.key.key_component import KeyComponent
from tribler.core.components.libtorrent.download_manager.download_manager import DownloadManager
from tribler.core.components.socks_servers.socks_servers_component import SocksServersComponent
from tribler.core.utilities.rest_utils import path_to_url


class LibtorrentComponent(Component):
Expand All @@ -21,6 +20,7 @@ async def run(self):

self.download_manager = DownloadManager(
config=config.libtorrent,
gui_test_mode=config.gui_test_mode,
state_dir=config.state_dir,
notifier=self.session.notifier,
peer_mid=key_component.primary_key.key_to_hash(),
Expand All @@ -30,15 +30,10 @@ async def run(self):
dummy_mode=config.gui_test_mode)
self.download_manager.initialize()

await self.download_manager.load_checkpoints()

if config.gui_test_mode:
from tribler.core.tests.tools.common import TORRENT_WITH_DIRS # pylint: disable=import-outside-toplevel
uri = path_to_url(TORRENT_WITH_DIRS)
await self.download_manager.start_download_from_uri(uri)
# load checkpoints in a background task to not delay initialization of dependent components (e.g. RESTComponent)
self.download_manager.start()

async def shutdown(self):
await super().shutdown()
if self.download_manager:
self.download_manager.stop_download_states_callback()
await self.download_manager.shutdown()
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@
from tribler.core.utilities.utilities import froze_it


TOTAL = 'total'
LOADED = 'loaded'
ALL_LOADED = 'all_loaded'


def _safe_extended_peer_info(ext_peer_info):
"""
Given a string describing peer info, return a json.dumps() safe representation.
Expand Down Expand Up @@ -84,7 +89,7 @@ class DownloadsEndpoint(RESTEndpoint):
starting, pausing and stopping downloads.
"""

def __init__(self, download_manager, metadata_store=None, tunnel_community=None):
def __init__(self, download_manager: DownloadManager, metadata_store=None, tunnel_community=None):
super().__init__()
self.download_manager = download_manager
self.mds = metadata_store
Expand Down Expand Up @@ -219,6 +224,11 @@ def get_files_info_json(download):
'vod_prebuffering_progress_consec': Float,
'error': String,
'time_added': Integer
}),
'checkpoints': schema(Checkpoints={
TOTAL: Integer,
LOADED: Integer,
ALL_LOADED: Boolean,
})
}),
}
Expand All @@ -237,6 +247,15 @@ async def get_downloads(self, request):
get_pieces = params.get('get_pieces', '0') == '1'
get_files = params.get('get_files', '0') == '1'

checkpoints = {
TOTAL: self.download_manager.checkpoints_count,
LOADED: self.download_manager.checkpoints_loaded,
ALL_LOADED: self.download_manager.all_checkpoints_are_loaded,
}

if not self.download_manager.all_checkpoints_are_loaded:
return RESTResponse({"downloads": [], "checkpoints": checkpoints})

downloads_json = []
downloads = self.download_manager.get_downloads()
for download in downloads:
Expand Down Expand Up @@ -332,7 +351,7 @@ async def get_downloads(self, request):
download_json["files"] = self.get_files_info_json(download)

downloads_json.append(download_json)
return RESTResponse({"downloads": downloads_json})
return RESTResponse({"downloads": downloads_json, "checkpoints": checkpoints})

@docs(
tags=["Libtorrent"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,22 +149,39 @@ def test_get_extended_status_circuits(mock_extended_status):
assert mock_extended_status == DLSTATUS_CIRCUITS


async def test_get_downloads_if_checkpoints_are_not_loaded(mock_dlmgr, rest_api):
mock_dlmgr.checkpoints_count = 10
mock_dlmgr.checkpoints_loaded = 5
mock_dlmgr.all_checkpoints_are_loaded = False

expected_json = {"downloads": [], "checkpoints": {"total": 10, "loaded": 5, "all_loaded": False}}
await do_request(rest_api, "downloads?get_peers=1&get_pieces=1", expected_code=200, expected_json=expected_json)


async def test_get_downloads_no_downloads(mock_dlmgr, rest_api):
"""
Testing whether the API returns an empty list when downloads are fetched but no downloads are active
"""
result = await do_request(rest_api, 'downloads?get_peers=1&get_pieces=1',
expected_code=200, expected_json={"downloads": []})
assert result["downloads"] == []
mock_dlmgr.checkpoints_count = 0
mock_dlmgr.checkpoints_loaded = 0
mock_dlmgr.all_checkpoints_are_loaded = True

expected_json = {"downloads": [], "checkpoints": {"total": 0, "loaded": 0, "all_loaded": True}}
await do_request(rest_api, "downloads?get_peers=1&get_pieces=1", expected_code=200, expected_json=expected_json)


async def test_get_downloads(mock_dlmgr, test_download, rest_api):
"""
Testing whether the API returns the right download when a download is added
"""
mock_dlmgr.get_downloads = lambda: [test_download]
mock_dlmgr.checkpoints_count = 1
mock_dlmgr.checkpoints_loaded = 1
mock_dlmgr.all_checkpoints_are_loaded = True

downloads = await do_request(rest_api, 'downloads?get_peers=1&get_pieces=1', expected_code=200)
assert len(downloads["downloads"]) == 1
assert downloads["checkpoints"] == {"total": 1, "loaded": 1, "all_loaded": True}


async def test_start_download_no_uri(rest_api):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
from asyncio import Future, gather, get_event_loop, sleep
from unittest.mock import MagicMock

Expand Down Expand Up @@ -381,6 +382,13 @@ def mock_start_download(*_, **__):
assert not good


@pytest.mark.asyncio
async def test_download_manager_start(fake_dlmgr):
fake_dlmgr.start()
await asyncio.sleep(0.01)
assert fake_dlmgr.all_checkpoints_are_loaded


def test_load_empty_checkpoint(fake_dlmgr, tmpdir):
"""
Test whether download resumes with faulty pstate file.
Expand Down Expand Up @@ -413,8 +421,16 @@ def mocked_load_checkpoint(filename):
state_file.write(b"hi")

fake_dlmgr.load_checkpoint = mocked_load_checkpoint
assert fake_dlmgr.all_checkpoints_are_loaded is False
assert fake_dlmgr.checkpoints_count is None
assert fake_dlmgr.checkpoints_loaded == 0

await fake_dlmgr.load_checkpoints()

assert mocked_load_checkpoint.called
assert fake_dlmgr.all_checkpoints_are_loaded is True
assert fake_dlmgr.checkpoints_count == 1
assert fake_dlmgr.checkpoints_loaded == 1


@pytest.mark.asyncio
Expand Down
3 changes: 3 additions & 0 deletions src/tribler/core/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ def mock_dlmgr(state_dir):
dlmgr.get_checkpoint_dir = lambda: checkpoints_dir
dlmgr.state_dir = state_dir
dlmgr.get_downloads = lambda: []
dlmgr.checkpoints_count = 1
dlmgr.checkpoints_loaded = 1
dlmgr.all_checkpoints_are_loaded = True
return dlmgr


Expand Down
2 changes: 1 addition & 1 deletion src/tribler/gui/event_request_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

received_events = []

CORE_CONNECTION_TIMEOUT = 60
CORE_CONNECTION_TIMEOUT = 120
RECONNECT_INTERVAL_MS = 100


Expand Down
8 changes: 2 additions & 6 deletions src/tribler/gui/tests/test_core_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,9 @@ def test_decode_raw_core_output(core_manager):


def test_format_error_message():
actual = CoreManager.format_error_message(exit_code=errno.ENOENT, exit_status=1, last_core_output='last\noutput')
actual = CoreManager.format_error_message(exit_code=errno.ENOENT, exit_status=1)
expected = '''The Tribler core has unexpectedly finished with exit code 2 and status: 1.

Error message: No such file or directory

Last core output:
> last
> output'''
Error message: No such file or directory'''
drew2a marked this conversation as resolved.
Show resolved Hide resolved

assert actual == expected
25 changes: 21 additions & 4 deletions src/tribler/gui/widgets/downloadspage.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import logging
import os
import time
from typing import Optional

from PyQt5.QtCore import QTimer, QUrl, Qt, pyqtSignal
from PyQt5.QtGui import QDesktopServices
Expand Down Expand Up @@ -53,6 +55,7 @@ class DownloadsPage(AddBreadcrumbOnShowMixin, QWidget):

def __init__(self):
QWidget.__init__(self)
self._logger = logging.getLogger(self.__class__.__name__)
self.export_dir = None
self.filter = DOWNLOADS_FILTER_ALL
self.download_widgets = {} # key: infohash, value: QTreeWidgetItem
Expand All @@ -62,7 +65,8 @@ def __init__(self):
self.downloads_last_update = 0
self.selected_items = []
self.dialog = None
self.loading_message_widget = None
self.loading_message_widget: Optional[LoadingDownloadWidgetItem] = None
self.loading_list_item: Optional[LoadingListItem] = None
self.total_download = 0
self.total_upload = 0

Expand Down Expand Up @@ -109,10 +113,9 @@ def on_filter_text_changed(self, text):
def start_loading_downloads(self):
self.window().downloads_list.setSelectionMode(QAbstractItemView.NoSelection)
self.loading_message_widget = LoadingDownloadWidgetItem()
self.loading_list_item = LoadingListItem(self.window().downloads_list)
self.window().downloads_list.addTopLevelItem(self.loading_message_widget)
self.window().downloads_list.setItemWidget(
self.loading_message_widget, 2, LoadingListItem(self.window().downloads_list)
)
self.window().downloads_list.setItemWidget(self.loading_message_widget, 2, self.loading_list_item)
self.schedule_downloads_timer(now=True)

def schedule_downloads_timer(self, now=False):
Expand Down Expand Up @@ -148,6 +151,20 @@ def load_downloads(self):
def on_received_downloads(self, downloads):
if not downloads or "downloads" not in downloads:
return # This might happen when closing Tribler

checkpoints = downloads.get('checkpoints', {})
if checkpoints and self.loading_message_widget:
# If not all checkpoints are loaded, display the number of the loaded checkpoints
total = checkpoints['total']
loaded = checkpoints['loaded']
if not checkpoints['all_loaded']:
# The column is too narrow for a long message, probably we should redesign this UI element later
message = f'{loaded}/{total} checkpoints'
self._logger.info(f'Loading checkpoints: {message}')
self.loading_list_item.textlabel.setText(message)
self.schedule_downloads_timer()
return

loading_widget_index = self.window().downloads_list.indexOfTopLevelItem(self.loading_message_widget)
if loading_widget_index > -1:
self.window().downloads_list.takeTopLevelItem(loading_widget_index)
Expand Down