Skip to content

Commit

Permalink
Stream downloads
Browse files Browse the repository at this point in the history
- Stream data downloads. Skip on tar error
  • Loading branch information
Martin-Molinero committed Mar 7, 2022
1 parent 539c2f1 commit 770f9c9
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 15 deletions.
14 changes: 10 additions & 4 deletions lean/components/api/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,18 @@ def _request(self, method: str, endpoint: str, options: Dict[str, Any] = {}, ret
timestamp = str(int(time()))
password = sha256(f"{self._api_token}:{timestamp}".encode("utf-8")).hexdigest()

headers = {
"Timestamp": timestamp
}

version = lean.__version__
if lean.__version__ == 'dev':
version = 99999999
headers["User-Agent"] = f"Lean CLI {version}"

response = self._http_client.request(method,
full_url,
headers={
"Timestamp": timestamp,
"User-Agent": f"Lean CLI {lean.__version__}"
},
headers=headers,
auth=(self._user_id, password),
raise_for_status=False,
**options)
Expand Down
19 changes: 14 additions & 5 deletions lean/components/api/data_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

from typing import List

from shutil import move
from tempfile import NamedTemporaryFile
from lean.components.api.api_client import *
from lean.models.api import QCDataInformation

Expand All @@ -31,20 +33,27 @@ def __init__(self, api_client: 'APIClient', http_client: 'HTTPClient') -> None:
self._api = api_client
self._http_client = http_client

def download_file(self, file_path: str, organization_id: str) -> bytes:
def download_file(self, relative_file_path: str, organization_id: str, local_filename: str) -> None:
"""Downloads the content of a downloadable data file.
:param file_path: the path of the data file
:param relative_file_path: the relative path of the data file
:param organization_id: the id of the organization that should be billed
:return: the content of the data file
:param local_filename: the final local path where the data file will be stored
"""
data = self._api.post("data/read", {
"format": "link",
"filePath": file_path,
"filePath": relative_file_path,
"organizationId": organization_id
})

return self._http_client.get(data["link"]).content
# we stream the data into a temporary file and later move it to it's final location
with self._http_client.get(data["link"], stream=True) as r:
r.raise_for_status()
with NamedTemporaryFile(delete=False) as f:
temp_file_name = f.name
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
move(temp_file_name, local_filename)

def download_public_file(self, data_endpoint: str) -> bytes:
"""Downloads the content of a downloadable public file.
Expand Down
10 changes: 4 additions & 6 deletions lean/components/cloud/data_downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def update_database_files(self):
if "not found" in str(e):
pass
else:
self._logger.error(str(e))
self._logger.error(str(e))
except Exception as e:
self._logger.error(str(e))

Expand Down Expand Up @@ -100,14 +100,15 @@ def download_files(self, data_files: List[Any], overwrite: bool, organization_id
self._lean_config_manager.set_properties({
"factor-file-provider": "QuantConnect.Data.Auxiliary.LocalZipFactorFileProvider"
})

progress.stop()
except KeyboardInterrupt as e:
progress.stop()
raise e

def _process_bulk(self, file: Path, destination: Path):
tar = tarfile.open(file)
tar.errorlevel = 0
tar.extractall(destination)
tar.close()

Expand Down Expand Up @@ -138,16 +139,13 @@ def _download_file(self,
callback()
return


try:
file_content = self._api_client.data.download_file(relative_file, organization_id)
self._api_client.data.download_file(relative_file, organization_id, local_path)
except RequestFailedError as error:
self._logger.warn(f"{local_path}: {error}\nYou have not been charged for this file")
callback()
return

_store_local_file(file_content, local_path)

# Special case: bulk files need unpacked
if "setup/" in relative_file and relative_file.endswith(".tar"):
self._process_bulk(local_path, data_directory)
Expand Down

0 comments on commit 770f9c9

Please sign in to comment.