From 5acdf642e01bfc25d0fb68c2ff7405d88ebee946 Mon Sep 17 00:00:00 2001 From: MickaelRigault Date: Fri, 14 Apr 2023 11:51:15 +0200 Subject: [PATCH] adding session option for downloading using requests --- pyproject.toml | 2 +- ztfquery/__init__.py | 2 +- ztfquery/io.py | 223 +++++++++++++++++++++++++++++-------------- 3 files changed, 152 insertions(+), 75 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 0e27be5..0caf034 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "ztfquery" -version = "1.24.10" +version = "1.25.0" description = "Python package to access ZTF data" authors = ["Mickael Rigault ", "Simeon Reusch "] maintainers = ["Mickael Rigault ", "Simeon Reusch "] diff --git a/ztfquery/__init__.py b/ztfquery/__init__.py index 5044ba0..6e165a2 100755 --- a/ztfquery/__init__.py +++ b/ztfquery/__init__.py @@ -1,4 +1,4 @@ -__version__ = "1.24.10" +__version__ = "1.25.0" from .io import get_file from .query import ZTFQuery, get_metadata diff --git a/ztfquery/io.py b/ztfquery/io.py index 3e0996f..cf5ccff 100755 --- a/ztfquery/io.py +++ b/ztfquery/io.py @@ -38,6 +38,7 @@ def get_file( filename, suffix=None, + session=None, downloadit=True, check_suffix=True, dlfrom="irsa", @@ -57,11 +58,11 @@ def get_file( Parameters ---------- - filename: [string] + filename: str name of the file you want. raw, cal and sci filenames are accepted. - suffix: [string] -optional- + suffix: str actual suffix of the file you want. By default it is that of the filename but you can request associated files with other suffix. For instance: @@ -69,10 +70,13 @@ def get_file( you will be looking for ztf_20190917468333_000698_zi_c03_o_q2_mskimg.fits. - If you input a raw file, then suffix is used as 'imgtypecode' - downloadit: [bool] -optional- + session: requests.Session + a request session you already have (if any) + + downloadit: bool If you do not have the file locally, shall this download it for you ? - wait: [None/string/float] -optional- + wait: Nonen str, float Waiting time for dwnloaded the file. It helps mostly when dask is massively multi-downloading. - None: if Client, wait -> len(to_be_downloaded)/100 (so 100 per second) ; corresponds to wait="100" - "None": no limit @@ -105,7 +109,7 @@ def get_file( # local_filenames if overwrite: - flag_todl = np.asarray(np.ones(len(local_filenames)), dtype="bool") + flag_todl = np.asarray( np.ones( len(local_filenames)), dtype="bool") else: flag_todl = np.asarray( [ @@ -133,6 +137,7 @@ def get_file( f_ = download_from_filename( local_filenames[flag_todl], + session=session, show_progress=show_progress, host=dlfrom, overwrite=True, @@ -143,7 +148,6 @@ def get_file( ) if client is not None: from dask.distributed import wait as dwait - _ = dwait(f_) # - Output @@ -186,41 +190,60 @@ def filefracday_and_ccdid_to_rawfilename(filefracday, ccdid, source="local"): ) -def bulk_get_file(filenames, client=None, suffix=None, as_dask="delayed", **kwargs): +def bulk_get_file(filenames, client=None, suffix=None, + as_dask="delayed", **kwargs): """ Parameters ---------- - as_dask: [string] -optional- - could be + as_dask: str + could be: - delayed - futures - gathered + - computed + + Return + ------ + list + - computed or gathered option will return the list of downloaded filepath + -> list_of_file + - delayed of futures will return in addition the session that should eventually be closed. + -> list_of_file, session """ import dask + session = open_irsa_session() + if client is None and as_dask in ["gather", "gathered"]: as_dask = "compute" - d_files = [ - dask.delayed(get_file)( - filename, suffix=suffix, show_progress=False, maxnprocess=1, **kwargs - ) - for filename in filenames - ] + + d_files = [ dask.delayed(get_file)(filename, suffix=suffix, session=session, + show_progress=False, maxnprocess=1,**kwargs) + for filename in filenames] + if as_dask == "delayed": - return d_files + return d_files, session + if as_dask in ["computed", "compute"]: - return dask.delayed(list)(d_files).compute() + results = dask.delayed(list)(d_files).compute() + session.close() + return results if as_dask == "persist": if client is not None: - return client.persist(d_files) - return [f_.persist() for f_ in d_files] + persisted = client.persist(d_files) + else: + persisted = [f_.persist() for f_ in d_files] + return persisted, session futures = client.compute(d_files) if as_dask == "futures": - return futures + return futures, session if as_dask in ["gather", "gathered"]: - return client.gather(futures) + gathered = client.gather(futures) + session.close() + return gathered + raise ValueError(f"Cannot parse the given as_dask {as_dask}") @@ -268,9 +291,9 @@ def parse_filename(filename, as_serie=True): return parsed - def download_from_filename( filename, + session=None, suffix=None, overwrite=False, auth=None, @@ -283,15 +306,20 @@ def download_from_filename( wait=None, **kwargs, ): - """Download the file associated to the given filename""" + """ Download the file associated to the given filename + + Parameters + ---------- + session: requests.Session + session used to call the get method + + + """ if host not in ["irsa", "ccin2p3"]: raise ValueError(f"Only 'irsa' and 'ccin2p3' host implemented: {host} given") from .buildurl import filename_to_url - if auth is None: - auth = _load_id_(host) - remote_filename = [] local_filename = [] for file_ in np.atleast_1d(filename): @@ -315,28 +343,23 @@ def download_from_filename( if nodl: return [remote_filename, local_filename] - if host == "ccin2p3": - return [ - CCIN2P3.scp(remote_filename_, local_filename_, auth=auth) - for remote_filename_, local_filename_ in zip( - remote_filename, local_filename - ) - ] - - else: - nprocess = np.min([maxnprocess, len(local_filename)]) - - return download_url( + nprocess = np.min([maxnprocess, len(local_filename)]) + # cookies: + if auth is None: + auth = None, None # no username, no password + cookies = get_cookie(*auth, session=session, update=False) + + return download_url( remote_filename, local_filename, + session=session, nprocess=nprocess, client=client, wait=wait, overwrite=overwrite, - cookies=get_cookie(*auth), + cookies=cookies, show_progress=show_progress, - **kwargs, - ) + **kwargs) def _parse_filename_(filename, builddir=False, squeeze=True, exists=False): @@ -366,6 +389,29 @@ def _parse_filename_(filename, builddir=False, squeeze=True, exists=False): return localfile +def open_irsa_session(auth=None, incl_cookies=True): + """ open a session that has irsa cookies + + Parameters + ---------- + auth: list + irsa logging (id, pwd) + + incl_cookies: bool + should the session acquire irsa login cookies ? + + Returns + ------- + requests.Session + """ + session = requests.Session() + + if incl_cookies: + if auth is None: + auth = None, None # no username, no password + _ = get_cookie(*auth, session, update=True) # update=True is useless most likely, but safer + + return session # ================= # # Crypting # @@ -408,7 +454,7 @@ def set_account( force=False, token_based=False, no_user=False, -): + ): """Setup the username and password (simply encrypted!) for the given `which` account. Saved in ~/.ztfquery """ @@ -484,12 +530,11 @@ def set_account( # TEST # # - Password testing -def test_irsa_account(auth=None): +def test_irsa_account(auth=None, **kwargs): """returns True if the IRSA account is correctly set.""" if auth is None: auth = _load_id_("irsa") - return ".ipac.caltech.edu" in get_cookie(*auth)._cookies - + return ".ipac.caltech.edu" in get_cookie(*auth, **kwargs)._cookies # - File testing def get_localfiles(extension="*", startpath=None): @@ -587,7 +632,7 @@ def test_files( show_progress=True, redownload=False, **kwargs, -): + ): """ Parameters @@ -670,9 +715,11 @@ def test_files( np.asarray(to_download_urls)[source_dl], np.asarray(fileissue)[source_dl], show_progress=show_progress, + session=session, overwrite=True, nprocess=nprocess, - cookies=get_cookie(*_load_id_(source)), + cookies=get_cookie(*_load_id_(source), + session=session, update=False), **kwargs, ) for source_ in np.unique(locations): @@ -780,33 +827,55 @@ def _fileissue_(filename, erasebad=True, fromdl=False, redownload=False): # ================= # # Logging Tools # # ================= # -def get_cookie(username, password): - """Get a cookie from the IPAC login service +def get_cookie(username=None, password=None, session=None, update=False): + """ get a cookie from the IPAC login service Parameters ---------- - username: [str] - The IPAC account username - password: [str] + username: str, None + The IPAC account username. + If None given, this will prompt you for one or will use already stored one. + + password: str, None The IPAC account password + If None given, this will prompt you for one or will use already stored one. + + session: requests.Session + session used to call the get method + + update: bool + if a session is given and if this session already has cookies + should this update it ? + If not, then nothing happens here. """ + # has a session that has cookies, should this update ? + if session is not None and len(session.cookies)>0 and not update: + return + + if username is None or password is None: + username, password = _load_id_("irsa") + url = "%s?josso_cmd=login&josso_username=%s&josso_password=%s" % ( LOGIN_URL, username, password, ) - return requests.get(url).cookies + if session is not None: + _ = session.get(url) # this attach the cookies to the session + + return requests.get(url).cookies # the returns the cookies def _download_(args): """To be used within _ZTFDownloader_.download_data() url, fileout,overwrite = args """ - url, fileout, overwrite, wait, cutouts, ra, dec, cutout_size = args + url, fileout, session, overwrite, wait, cutouts, ra, dec, cutout_size = args download_single_url( url, fileout=fileout, + session=session, overwrite=overwrite, wait=wait, cutouts=cutouts, @@ -818,6 +887,7 @@ def _download_(args): def download_url( to_download_urls, download_location, + session=None, cutouts=False, show_progress=True, wait=None, @@ -828,7 +898,7 @@ def download_url( radec=None, cutout_size=None, **kwargs, -): + ): """ """ # # - Dask Client @@ -839,6 +909,7 @@ def download_url( delayed(download_single_url)( url, cutouts=cutouts, + session=session, fileout=fileout, show_progress=False, overwrite=overwrite, @@ -846,7 +917,7 @@ def download_url( cookies=cookies, radec=radec, cutout_size=cutout_size, - **kwargs, + **kwargs ) for url, fileout in zip(to_download_urls, download_location) ] @@ -856,6 +927,7 @@ def download_url( # - MultiProcessing (or not) if nprocess is None: nprocess = 1 + elif nprocess < 1: raise ValueError("nprocess must 1 or higher (None means 1)") @@ -867,6 +939,7 @@ def download_url( url, cutouts=cutouts, fileout=fileout, + session=session, show_progress=show_progress, overwrite=overwrite, cookies=cookies, @@ -893,6 +966,7 @@ def download_url( overwrite_ = [overwrite] * len(to_download_urls) wait_ = [wait] * len(to_download_urls) cutouts_ = [cutouts] * len(to_download_urls) + session = [session] * len(to_download_urls) if radec is None: radec = [None,None] @@ -903,6 +977,7 @@ def download_url( args = zip( to_download_urls, download_location, + session, overwrite_, wait_, cutouts_, @@ -925,8 +1000,7 @@ def download_url( if bar is not None: bar.update(len(to_download_urls)) - -def download_fitsdata(url, cookies=None, **kwargs): +def download_fitsdata(url, session=None, **kwargs): """ download a fitsfile and get the first data (nothing stored) Parameters @@ -934,9 +1008,8 @@ def download_fitsdata(url, cookies=None, **kwargs): url: str fits file url. - cookies: None - cookies to download the data. Smart to provide it - if already loaded, it takes ~0.5s to acquire. + session: requests.Session + session that already has the cookies. **kwargs goes to download_single_url @@ -945,11 +1018,12 @@ def download_fitsdata(url, cookies=None, **kwargs): astropy.HDUList """ - out = download_single_url(url, cookies=cookies, **kwargs) + out = download_single_url(url, session=session, **kwargs) return fits.HDUList.fromstring(out.content) def download_single_url( url, + session=None, cutouts=False, radec=None, cutout_size=30, @@ -967,6 +1041,8 @@ def download_single_url( """Download the url target using requests.get. the data is returned (if fileout is None) or stored in `fileout` """ + + if wait is not None: waiting = wait if not randomize_wait else np.random.uniform(0, wait) time.sleep(waiting) @@ -987,17 +1063,18 @@ def download_single_url( # = Password and Username if cookies is None: - cookies = get_cookie(*_load_id_("irsa")) - + # this attach the cookies to the session if given + # and the returned 'cookies' is None + cookies = get_cookie(session=session, update=False) + # - requests options - download_prop = dict(cookies=cookies, stream=True) - for k, v in kwargs.items(): - download_prop[k] = v - - if cookies in ["no_cookies"]: - _ = download_prop.pop("cookies") + download_prop = {**dict(stream=True), **kwargs} + if cookies not in ["no_cookies"] and cookies is not None: # add cookies if needed + download_prop["cookies"] = cookies request_fnc = "get" if not "data" in download_prop else "post" + requests_or_session = requests if session is None else session + # = Where should the data be saved? if fileout is not None: directory = os.path.dirname(fileout) @@ -1008,11 +1085,11 @@ def download_single_url( else: download_prop["stream"] = False - return getattr(requests, request_fnc)(url, **download_prop) + return getattr(requests_or_session, request_fnc)(url, **download_prop) # With Progress bar? if not show_progress: - response = getattr(requests, request_fnc)(url, **download_prop) + response = getattr(requests_or_session, request_fnc)(url, **download_prop) if response.status_code == 200: with open(fileout, "wb") as f: for data in response.iter_content(chunk): @@ -1021,7 +1098,7 @@ def download_single_url( else: from astropy.utils.console import ProgressBar - response = getattr(requests, request_fnc)(url, **download_prop) + response = getattr(requests_or_session, request_fnc)(url, **download_prop) if response.status_code == 200: chunk_barstep = 500 f = open(fileout, "wb")