diff --git a/cate/ds/esa_cci_odp.py b/cate/ds/esa_cci_odp.py index cbbadd852..4008af334 100644 --- a/cate/ds/esa_cci_odp.py +++ b/cate/ds/esa_cci_odp.py @@ -45,6 +45,7 @@ import os import os.path import re + import urllib.parse import urllib.request import xarray as xr @@ -55,14 +56,12 @@ from cate.conf import get_config_value from cate.conf.defaults import NETCDF_COMPRESSION_LEVEL -from cate.core.ds import DATA_STORE_REGISTRY, DataStore, DataSource, Schema, open_xarray_dataset -from cate.core.ds import get_data_stores_path +from cate.core.ds import DATA_STORE_REGISTRY, DataStore, DataSource, Schema, \ + open_xarray_dataset, get_data_stores_path from cate.core.types import GeometryLike, TimeRange, TimeRangeLike, VariableNamesLike from cate.util.monitor import Monitor - - _ESGF_CEDA_URL = "https://esgf-index1.ceda.ac.uk/esg-search/search/" _TIMESTAMP_FORMAT = "%Y-%m-%d %H:%M:%S" @@ -453,58 +452,10 @@ def sync(self, time_range: Tuple[datetime, datetime]=None, protocol: str=None, monitor: Monitor=Monitor.NONE) -> Tuple[int, int]: - selected_file_list = self._find_files(time_range) - if not selected_file_list: - return 0, 0 - - if protocol is None: - protocol = _ODP_PROTOCOL_HTTP if protocol == _ODP_PROTOCOL_HTTP: - dataset_dir = self.local_dataset_dir() - - # Find outdated files - outdated_file_list = [] - for file_rec in selected_file_list: - filename, _, _, file_size, url = file_rec - dataset_file = os.path.join(dataset_dir, filename) - # todo (forman, 20160915): must perform better checks on dataset_file if it is... - # ... outdated or incomplete or corrupted. - # JSON also includes "checksum" and "checksum_type" fields. - if not os.path.isfile(dataset_file) or (file_size and os.path.getsize(dataset_file) != file_size): - outdated_file_list.append(file_rec) - - if not outdated_file_list: - # No sync needed - return 0, len(selected_file_list) - - with monitor.starting('Sync ' + self.name, len(outdated_file_list)): - bytes_to_download = sum([file_rec[3] for file_rec in outdated_file_list]) - dl_stat = _DownloadStatistics(bytes_to_download) - - file_number = 1 - dataset_dir = self.local_dataset_dir() - for filename, _, _, file_size, url in outdated_file_list: - if monitor.is_cancelled(): - raise InterruptedError - dataset_file = os.path.join(dataset_dir, filename) - sub_monitor = monitor.child(work=1.0) - - # noinspection PyUnusedLocal - def reporthook(block_number, read_size, total_file_size): - dl_stat.handle_chunk(read_size) - if monitor.is_cancelled(): - raise InterruptedError - sub_monitor.progress(work=read_size, msg=str(dl_stat)) - - sub_monitor_msg = "file %d of %d" % (file_number, len(outdated_file_list)) - with sub_monitor.starting(sub_monitor_msg, file_size): - urllib.request.urlretrieve(url[protocol], filename=dataset_file, reporthook=reporthook) - file_number += 1 - - return len(outdated_file_list), len(selected_file_list) - else: - return 0, 0 + self.make_local(self._master_id(), None, time_range, None, None, monitor) + return 0, 0 def delete_local(self, time_range: Tuple[datetime, datetime]) -> int: selected_file_list = self._find_files(time_range) @@ -591,6 +542,16 @@ def open_dataset(self, except OSError as e: raise IOError("Files: {} caused:\nOSError({}): {}".format(files, e.errno, e.strerror)) + @staticmethod + def _get_urls_list(files_description_list, protocol) -> Sequence[str]: + """ + Returns urls list extracted from reference esgf specific files description json list + :param files_description_list: + :param protocol: + :return: + """ + return [file_rec[4][protocol].replace('.html', '') for file_rec in files_description_list] + def make_local(self, local_name: str, local_id: str = None, @@ -617,12 +578,13 @@ def make_local(self, else: protocol = _ODP_PROTOCOL_OPENDAP - if protocol == _ODP_PROTOCOL_OPENDAP: + local_path = os.path.join(get_data_store_path(), local_name) - local_path = os.path.join(get_data_store_path(), local_name) + selected_file_list = self._find_files(time_range) - selected_file_list = self._find_files(time_range) - files = [file_rec[4][protocol].replace('.html', '') for file_rec in selected_file_list] + if protocol == _ODP_PROTOCOL_OPENDAP: + + files = self._get_urls_list(selected_file_list, protocol) for dataset_uri in files: child_monitor = monitor.child(work=1) @@ -630,22 +592,21 @@ def make_local(self, local_filepath = os.path.join(local_path, file_name) remote_netcdf = NetCDF4DataStore(dataset_uri) - local_netcdf = NetCDF4DataStore(local_filepath, mode='w', persist=True) + local_netcdf = NetCDF4DataStore(local_filepath, mode='w', persist=True) local_netcdf.set_attributes(remote_netcdf.get_attrs()) - child_monitor.start(label=file_name, total_work=len(var_names)) - - if not var_names: - var_names = [var_name for var_name in remote_netcdf.variables.keys()] - remote_dataset = xr.Dataset.load_store(remote_netcdf) if region: + [lat_min, lon_min, lat_max, lon_max] = region.bounds remote_dataset = remote_dataset.sel(drop=False, - lat=slice(region.bounds[0], region.bounds[0]), - lon=slice(region.bounds[2], region.bounds[3])) + lat=slice(lat_min, lat_max), + lon=slice(lon_min, lon_max)) + if not var_names: + var_names = [var_name for var_name in remote_netcdf.variables.keys()] + child_monitor.start(label=file_name, total_work=len(var_names)) for sel_var_name in var_names: var_dataset = remote_dataset.drop( [var_name for var_name in remote_dataset.variables.keys() if var_name != sel_var_name]) @@ -663,9 +624,42 @@ def make_local(self, monitor.done() else: - # TODO (kbernat): implement me! see sync() - raise NotImplementedError('EsaCciOdpDataSource.make_local() ' - 'HTTP download is not yet implemented') + outdated_file_list = [] + for file_rec in selected_file_list: + filename, _, _, file_size, url = file_rec + dataset_file = os.path.join(local_path, filename) + # todo (forman, 20160915): must perform better checks on dataset_file if it is... + # ... outdated or incomplete or corrupted. + # JSON also includes "checksum" and "checksum_type" fields. + if not os.path.isfile(dataset_file) or (file_size and os.path.getsize(dataset_file) != file_size): + outdated_file_list.append(file_rec) + + if not outdated_file_list: + return # No sync needed + + with monitor.starting('Sync ' + self.name, len(outdated_file_list)): + bytes_to_download = sum([file_rec[3] for file_rec in outdated_file_list]) + dl_stat = _DownloadStatistics(bytes_to_download) + + file_number = 1 + + for filename, _, _, file_size, url in outdated_file_list: + if monitor.is_cancelled(): + raise InterruptedError + dataset_file = os.path.join(local_path, filename) + sub_monitor = monitor.child(work=1.0) + + # noinspection PyUnusedLocal + def reporthook(block_number, read_size, total_file_size): + dl_stat.handle_chunk(read_size) + if monitor.is_cancelled(): + raise InterruptedError + sub_monitor.progress(work=read_size, msg=str(dl_stat)) + + sub_monitor_msg = "file %d of %d" % (file_number, len(outdated_file_list)) + with sub_monitor.starting(sub_monitor_msg, file_size): + urllib.request.urlretrieve(url[protocol], filename=dataset_file, reporthook=reporthook) + file_number += 1 def _init_file_list(self, monitor: Monitor=Monitor.NONE): if self._file_list: