Skip to content
This repository has been archived by the owner on Aug 29, 2023. It is now read-only.

Commit

Permalink
#153 make_local with http protocol for complete (no region or variabl…
Browse files Browse the repository at this point in the history
…e selection) datasets
  • Loading branch information
Krzysztof (Chris) Bernat committed Mar 16, 2017
1 parent 1f6a84a commit c684fd0
Showing 1 changed file with 63 additions and 69 deletions.
132 changes: 63 additions & 69 deletions cate/ds/esa_cci_odp.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import os
import os.path
import re

import urllib.parse
import urllib.request
import xarray as xr
Expand All @@ -55,14 +56,12 @@

from cate.conf import get_config_value
from cate.conf.defaults import NETCDF_COMPRESSION_LEVEL
from cate.core.ds import DATA_STORE_REGISTRY, DataStore, DataSource, Schema, open_xarray_dataset
from cate.core.ds import get_data_stores_path
from cate.core.ds import DATA_STORE_REGISTRY, DataStore, DataSource, Schema, \
open_xarray_dataset, get_data_stores_path
from cate.core.types import GeometryLike, TimeRange, TimeRangeLike, VariableNamesLike
from cate.util.monitor import Monitor




_ESGF_CEDA_URL = "https://esgf-index1.ceda.ac.uk/esg-search/search/"

_TIMESTAMP_FORMAT = "%Y-%m-%d %H:%M:%S"
Expand Down Expand Up @@ -453,58 +452,10 @@ def sync(self,
time_range: Tuple[datetime, datetime]=None,
protocol: str=None,
monitor: Monitor=Monitor.NONE) -> Tuple[int, int]:
selected_file_list = self._find_files(time_range)
if not selected_file_list:
return 0, 0

if protocol is None:
protocol = _ODP_PROTOCOL_HTTP

if protocol == _ODP_PROTOCOL_HTTP:
dataset_dir = self.local_dataset_dir()

# Find outdated files
outdated_file_list = []
for file_rec in selected_file_list:
filename, _, _, file_size, url = file_rec
dataset_file = os.path.join(dataset_dir, filename)
# todo (forman, 20160915): must perform better checks on dataset_file if it is...
# ... outdated or incomplete or corrupted.
# JSON also includes "checksum" and "checksum_type" fields.
if not os.path.isfile(dataset_file) or (file_size and os.path.getsize(dataset_file) != file_size):
outdated_file_list.append(file_rec)

if not outdated_file_list:
# No sync needed
return 0, len(selected_file_list)

with monitor.starting('Sync ' + self.name, len(outdated_file_list)):
bytes_to_download = sum([file_rec[3] for file_rec in outdated_file_list])
dl_stat = _DownloadStatistics(bytes_to_download)

file_number = 1
dataset_dir = self.local_dataset_dir()
for filename, _, _, file_size, url in outdated_file_list:
if monitor.is_cancelled():
raise InterruptedError
dataset_file = os.path.join(dataset_dir, filename)
sub_monitor = monitor.child(work=1.0)

# noinspection PyUnusedLocal
def reporthook(block_number, read_size, total_file_size):
dl_stat.handle_chunk(read_size)
if monitor.is_cancelled():
raise InterruptedError
sub_monitor.progress(work=read_size, msg=str(dl_stat))

sub_monitor_msg = "file %d of %d" % (file_number, len(outdated_file_list))
with sub_monitor.starting(sub_monitor_msg, file_size):
urllib.request.urlretrieve(url[protocol], filename=dataset_file, reporthook=reporthook)
file_number += 1

return len(outdated_file_list), len(selected_file_list)
else:
return 0, 0
self.make_local(self._master_id(), None, time_range, None, None, monitor)
return 0, 0

def delete_local(self, time_range: Tuple[datetime, datetime]) -> int:
selected_file_list = self._find_files(time_range)
Expand Down Expand Up @@ -591,6 +542,16 @@ def open_dataset(self,
except OSError as e:
raise IOError("Files: {} caused:\nOSError({}): {}".format(files, e.errno, e.strerror))

@staticmethod
def _get_urls_list(files_description_list, protocol) -> Sequence[str]:
"""
Returns urls list extracted from reference esgf specific files description json list
:param files_description_list:
:param protocol:
:return:
"""
return [file_rec[4][protocol].replace('.html', '') for file_rec in files_description_list]

def make_local(self,
local_name: str,
local_id: str = None,
Expand All @@ -617,35 +578,35 @@ def make_local(self,
else:
protocol = _ODP_PROTOCOL_OPENDAP

if protocol == _ODP_PROTOCOL_OPENDAP:
local_path = os.path.join(get_data_store_path(), local_name)

local_path = os.path.join(get_data_store_path(), local_name)
selected_file_list = self._find_files(time_range)

selected_file_list = self._find_files(time_range)
files = [file_rec[4][protocol].replace('.html', '') for file_rec in selected_file_list]
if protocol == _ODP_PROTOCOL_OPENDAP:

files = self._get_urls_list(selected_file_list, protocol)
for dataset_uri in files:
child_monitor = monitor.child(work=1)

file_name = os.path.basename(dataset_uri)
local_filepath = os.path.join(local_path, file_name)

remote_netcdf = NetCDF4DataStore(dataset_uri)
local_netcdf = NetCDF4DataStore(local_filepath, mode='w', persist=True)

local_netcdf = NetCDF4DataStore(local_filepath, mode='w', persist=True)
local_netcdf.set_attributes(remote_netcdf.get_attrs())

child_monitor.start(label=file_name, total_work=len(var_names))

if not var_names:
var_names = [var_name for var_name in remote_netcdf.variables.keys()]

remote_dataset = xr.Dataset.load_store(remote_netcdf)

if region:
[lat_min, lon_min, lat_max, lon_max] = region.bounds
remote_dataset = remote_dataset.sel(drop=False,
lat=slice(region.bounds[0], region.bounds[0]),
lon=slice(region.bounds[2], region.bounds[3]))
lat=slice(lat_min, lat_max),
lon=slice(lon_min, lon_max))
if not var_names:
var_names = [var_name for var_name in remote_netcdf.variables.keys()]

child_monitor.start(label=file_name, total_work=len(var_names))
for sel_var_name in var_names:
var_dataset = remote_dataset.drop(
[var_name for var_name in remote_dataset.variables.keys() if var_name != sel_var_name])
Expand All @@ -663,9 +624,42 @@ def make_local(self,
monitor.done()

else:
# TODO (kbernat): implement me! see sync()
raise NotImplementedError('EsaCciOdpDataSource.make_local() '
'HTTP download is not yet implemented')
outdated_file_list = []
for file_rec in selected_file_list:
filename, _, _, file_size, url = file_rec
dataset_file = os.path.join(local_path, filename)
# todo (forman, 20160915): must perform better checks on dataset_file if it is...
# ... outdated or incomplete or corrupted.
# JSON also includes "checksum" and "checksum_type" fields.
if not os.path.isfile(dataset_file) or (file_size and os.path.getsize(dataset_file) != file_size):
outdated_file_list.append(file_rec)

if not outdated_file_list:
return # No sync needed

with monitor.starting('Sync ' + self.name, len(outdated_file_list)):
bytes_to_download = sum([file_rec[3] for file_rec in outdated_file_list])
dl_stat = _DownloadStatistics(bytes_to_download)

file_number = 1

for filename, _, _, file_size, url in outdated_file_list:
if monitor.is_cancelled():
raise InterruptedError
dataset_file = os.path.join(local_path, filename)
sub_monitor = monitor.child(work=1.0)

# noinspection PyUnusedLocal
def reporthook(block_number, read_size, total_file_size):
dl_stat.handle_chunk(read_size)
if monitor.is_cancelled():
raise InterruptedError
sub_monitor.progress(work=read_size, msg=str(dl_stat))

sub_monitor_msg = "file %d of %d" % (file_number, len(outdated_file_list))
with sub_monitor.starting(sub_monitor_msg, file_size):
urllib.request.urlretrieve(url[protocol], filename=dataset_file, reporthook=reporthook)
file_number += 1

def _init_file_list(self, monitor: Monitor=Monitor.NONE):
if self._file_list:
Expand Down

0 comments on commit c684fd0

Please sign in to comment.