Skip to content

Commit

Permalink
Merge pull request #72 from QuantConnect/refactor-cli-downloads-are-s…
Browse files Browse the repository at this point in the history
…treamed

Bulk downloads improvements
  • Loading branch information
Martin-Molinero authored Mar 7, 2022
2 parents e6614b7 + c4757ee commit 4bef6e6
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 13 deletions.
11 changes: 10 additions & 1 deletion lean/commands/data/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,13 @@ def _get_available_datasets(organization: QCFullOrganization) -> List[Dataset]:
return available_datasets


def _is_bulk_download(products: List[Product]) -> bool:
for product in products:
if "data-type" in product.option_results and product.option_results["data-type"].value == "bulk":
return True
return False


@click.command(cls=LeanCommand, requires_lean_config=True, allow_unknown_options=True)
@click.option("--dataset", type=str, help="The name of the dataset to download non-interactively")
@click.option("--organization", type=str, help="The name or id of the organization to purchase and download data with")
Expand Down Expand Up @@ -431,7 +438,9 @@ def download(ctx: click.Context,
products = _select_products_interactive(selected_organization, datasets)

_confirm_organization_balance(selected_organization, products)
_accept_agreement(selected_organization, is_interactive)

if not _is_bulk_download(products):
_accept_agreement(selected_organization, is_interactive)

if is_interactive:
_confirm_payment(selected_organization, products)
Expand Down
31 changes: 26 additions & 5 deletions lean/components/api/data_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import List

import os
from shutil import move
from tempfile import NamedTemporaryFile
from lean.components.api.api_client import *
from lean.models.api import QCDataInformation
from typing import List, Callable


class DataClient:
Expand All @@ -33,12 +33,14 @@ def __init__(self, api_client: 'APIClient', http_client: 'HTTPClient') -> None:
self._api = api_client
self._http_client = http_client

def download_file(self, relative_file_path: str, organization_id: str, local_filename: str) -> None:
def download_file(self, relative_file_path: str, organization_id: str,
local_filename: str, progress_callback: Callable[[float], None]) -> None:
"""Downloads the content of a downloadable data file.
:param relative_file_path: the relative path of the data file
:param organization_id: the id of the organization that should be billed
:param local_filename: the final local path where the data file will be stored
:param progress_callback: the download progress callback
"""
data = self._api.post("data/read", {
"format": "link",
Expand All @@ -49,10 +51,29 @@ def download_file(self, relative_file_path: str, organization_id: str, local_fil
# we stream the data into a temporary file and later move it to it's final location
with self._http_client.get(data["link"], stream=True) as r:
r.raise_for_status()
total_size = 0
try:
total_size = int(r.headers['Content-length'])
except:
pass
current_size = 0
with NamedTemporaryFile(delete=False) as f:
temp_file_name = f.name
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
for chunk in r.iter_content(chunk_size=1024 * 1024):
if total_size != 0:
previous = current_size / total_size

current_size += f.write(chunk)

if total_size != 0:
# progressive progress update if we can
progress_callback((current_size / total_size) - previous)
if total_size == 0:
# if total size not available update progress at the end
progress_callback(1)

directory = os.path.dirname(local_filename)
os.makedirs(directory, exist_ok=True)
move(temp_file_name, local_filename)

def download_public_file(self, data_endpoint: str) -> bytes:
Expand Down
12 changes: 5 additions & 7 deletions lean/components/cloud/data_downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def download_files(self, data_files: List[Any], overwrite: bool, organization_id

data_dir = self._lean_config_manager.get_data_directory()
parallel(delayed(self._download_file)(data_file.file, overwrite, data_dir, organization_id,
lambda: progress.update(progress_task, advance=1))
lambda advance: progress.update(progress_task, advance=advance))
for data_file in data_files)

# update our config after we download all files, and not in parallel!
Expand Down Expand Up @@ -117,7 +117,7 @@ def _download_file(self,
overwrite: bool,
data_directory: Path,
organization_id: str,
callback: Callable[[], None]) -> None:
progress_callback: Callable[[float], None]) -> None:
"""Downloads a single file from QuantConnect Datasets to the local data directory.
If this method downloads a map or factor files zip file,
Expand All @@ -136,18 +136,16 @@ def _download_file(self,
f"{local_path} already exists, use --overwrite to overwrite it",
"You have not been charged for this file"
]))
callback()
progress_callback(1)
return

try:
self._api_client.data.download_file(relative_file, organization_id, local_path)
self._api_client.data.download_file(relative_file, organization_id, local_path, progress_callback)
except RequestFailedError as error:
self._logger.warn(f"{local_path}: {error}\nYou have not been charged for this file")
callback()
progress_callback(1)
return

# Special case: bulk files need unpacked
if "setup/" in relative_file and relative_file.endswith(".tar"):
self._process_bulk(local_path, data_directory)

callback()

0 comments on commit 4bef6e6

Please sign in to comment.