From c4757eeecd29817d7a23521898288bb6af112b8d Mon Sep 17 00:00:00 2001 From: Martin-Molinero Date: Mon, 7 Mar 2022 18:54:02 -0300 Subject: [PATCH] Bulk downloads improvements - Add missing directory create for download - Improve download progress bar to be more precise. - Skip data agreement check for bulk data downloads --- lean/commands/data/download.py | 11 ++++++++- lean/components/api/data_client.py | 31 ++++++++++++++++++++---- lean/components/cloud/data_downloader.py | 12 ++++----- 3 files changed, 41 insertions(+), 13 deletions(-) diff --git a/lean/commands/data/download.py b/lean/commands/data/download.py index 58e37880..e442525f 100644 --- a/lean/commands/data/download.py +++ b/lean/commands/data/download.py @@ -394,6 +394,13 @@ def _get_available_datasets(organization: QCFullOrganization) -> List[Dataset]: return available_datasets +def _is_bulk_download(products: List[Product]) -> bool: + for product in products: + if "data-type" in product.option_results and product.option_results["data-type"].value == "bulk": + return True + return False + + @click.command(cls=LeanCommand, requires_lean_config=True, allow_unknown_options=True) @click.option("--dataset", type=str, help="The name of the dataset to download non-interactively") @click.option("--organization", type=str, help="The name or id of the organization to purchase and download data with") @@ -431,7 +438,9 @@ def download(ctx: click.Context, products = _select_products_interactive(selected_organization, datasets) _confirm_organization_balance(selected_organization, products) - _accept_agreement(selected_organization, is_interactive) + + if not _is_bulk_download(products): + _accept_agreement(selected_organization, is_interactive) if is_interactive: _confirm_payment(selected_organization, products) diff --git a/lean/components/api/data_client.py b/lean/components/api/data_client.py index 3f43eb23..79f07ff6 100644 --- a/lean/components/api/data_client.py +++ b/lean/components/api/data_client.py @@ -11,12 +11,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import List - +import os from shutil import move from tempfile import NamedTemporaryFile from lean.components.api.api_client import * from lean.models.api import QCDataInformation +from typing import List, Callable class DataClient: @@ -33,12 +33,14 @@ def __init__(self, api_client: 'APIClient', http_client: 'HTTPClient') -> None: self._api = api_client self._http_client = http_client - def download_file(self, relative_file_path: str, organization_id: str, local_filename: str) -> None: + def download_file(self, relative_file_path: str, organization_id: str, + local_filename: str, progress_callback: Callable[[float], None]) -> None: """Downloads the content of a downloadable data file. :param relative_file_path: the relative path of the data file :param organization_id: the id of the organization that should be billed :param local_filename: the final local path where the data file will be stored + :param progress_callback: the download progress callback """ data = self._api.post("data/read", { "format": "link", @@ -49,10 +51,29 @@ def download_file(self, relative_file_path: str, organization_id: str, local_fil # we stream the data into a temporary file and later move it to it's final location with self._http_client.get(data["link"], stream=True) as r: r.raise_for_status() + total_size = 0 + try: + total_size = int(r.headers['Content-length']) + except: + pass + current_size = 0 with NamedTemporaryFile(delete=False) as f: temp_file_name = f.name - for chunk in r.iter_content(chunk_size=8192): - f.write(chunk) + for chunk in r.iter_content(chunk_size=1024 * 1024): + if total_size != 0: + previous = current_size / total_size + + current_size += f.write(chunk) + + if total_size != 0: + # progressive progress update if we can + progress_callback((current_size / total_size) - previous) + if total_size == 0: + # if total size not available update progress at the end + progress_callback(1) + + directory = os.path.dirname(local_filename) + os.makedirs(directory, exist_ok=True) move(temp_file_name, local_filename) def download_public_file(self, data_endpoint: str) -> bytes: diff --git a/lean/components/cloud/data_downloader.py b/lean/components/cloud/data_downloader.py index c2dac517..baeaefa1 100644 --- a/lean/components/cloud/data_downloader.py +++ b/lean/components/cloud/data_downloader.py @@ -87,7 +87,7 @@ def download_files(self, data_files: List[Any], overwrite: bool, organization_id data_dir = self._lean_config_manager.get_data_directory() parallel(delayed(self._download_file)(data_file.file, overwrite, data_dir, organization_id, - lambda: progress.update(progress_task, advance=1)) + lambda advance: progress.update(progress_task, advance=advance)) for data_file in data_files) # update our config after we download all files, and not in parallel! @@ -117,7 +117,7 @@ def _download_file(self, overwrite: bool, data_directory: Path, organization_id: str, - callback: Callable[[], None]) -> None: + progress_callback: Callable[[float], None]) -> None: """Downloads a single file from QuantConnect Datasets to the local data directory. If this method downloads a map or factor files zip file, @@ -136,18 +136,16 @@ def _download_file(self, f"{local_path} already exists, use --overwrite to overwrite it", "You have not been charged for this file" ])) - callback() + progress_callback(1) return try: - self._api_client.data.download_file(relative_file, organization_id, local_path) + self._api_client.data.download_file(relative_file, organization_id, local_path, progress_callback) except RequestFailedError as error: self._logger.warn(f"{local_path}: {error}\nYou have not been charged for this file") - callback() + progress_callback(1) return # Special case: bulk files need unpacked if "setup/" in relative_file and relative_file.endswith(".tar"): self._process_bulk(local_path, data_directory) - - callback()