Skip to content
This repository has been archived by the owner on Aug 29, 2023. It is now read-only.

Commit

Permalink
#153 fixed region selection, added updated geospatial_lat/lon_max/max…
Browse files Browse the repository at this point in the history
… to new netcdf attributes
  • Loading branch information
Krzysztof (Chris) Bernat committed Mar 21, 2017
1 parent bbaad82 commit 1ebb5be
Showing 1 changed file with 68 additions and 27 deletions.
95 changes: 68 additions & 27 deletions cate/ds/esa_cci_odp.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import xarray as xr
from collections import OrderedDict
from datetime import datetime, timedelta
from math import ceil, floor
from typing import Sequence, Tuple, Optional, Any
from xarray.backends.netCDF4_ import NetCDF4DataStore

Expand Down Expand Up @@ -88,6 +89,11 @@


def get_data_store_path():
return os.environ.get('CATE_ESA_CCI_ODP_DATA_STORE_PATH',
os.path.join(get_data_stores_path(), 'local'))


def get_metadata_store_path():
return os.environ.get('CATE_ESA_CCI_ODP_DATA_STORE_PATH',
os.path.join(get_data_stores_path(), 'esa_cci_odp'))

Expand Down Expand Up @@ -319,7 +325,7 @@ def _load_index(self):
latest='true',
project='esacci')],
cache_used=self._index_cache_used,
cache_dir=get_data_store_path(),
cache_dir=get_metadata_store_path(),
cache_json_filename='dataset-list.json',
cache_timestamp_filename='dataset-list-timestamp.json',
cache_expiration_days=self._index_cache_expiration_days)
Expand Down Expand Up @@ -479,6 +485,9 @@ def delete_local(self, time_range: Tuple[datetime, datetime]) -> int:
def local_dataset_dir(self):
return os.path.join(get_data_store_path(), self._master_id)

def local_metadata_dataset_dir(self):
return os.path.join(get_metadata_store_path(), self._master_id)

def _find_files(self, time_range):
requested_start_date, requested_end_date = time_range if time_range else (None, None)
self._init_file_list()
Expand Down Expand Up @@ -566,7 +575,7 @@ def make_local(self,

time_range = TimeRangeLike.convert(time_range) if time_range else None
region = GeometryLike.convert(region) if region else None
var_names = VariableNamesLike.convert(var_names) if var_names else None # type: List
var_names = VariableNamesLike.convert(var_names) if var_names else None # type: Sequence

compression_level = get_config_value('NETCDF_COMPRESSION_LEVEL', NETCDF_COMPRESSION_LEVEL)
compression_enabled = True if compression_level > 0 else False
Expand All @@ -580,8 +589,14 @@ def make_local(self,
else:
protocol = _ODP_PROTOCOL_OPENDAP

if not os.path.exists(get_data_store_path()):
os.mkdir(get_data_store_path())

local_path = os.path.join(get_data_store_path(), local_name)

if not os.path.exists(local_path):
os.mkdir(local_path)

selected_file_list = self._find_files(time_range)

if protocol == _ODP_PROTOCOL_OPENDAP:
Expand All @@ -593,34 +608,60 @@ def make_local(self,
file_name = os.path.basename(dataset_uri)
local_filepath = os.path.join(local_path, file_name)

remote_netcdf = NetCDF4DataStore(dataset_uri)
remote_netcdf = None
local_netcdf = None
try:
remote_netcdf = NetCDF4DataStore(dataset_uri)

local_netcdf = NetCDF4DataStore(local_filepath, mode='w', persist=True)
local_netcdf.set_attributes(remote_netcdf.get_attrs())

local_netcdf = NetCDF4DataStore(local_filepath, mode='w', persist=True)
local_netcdf.set_attributes(remote_netcdf.get_attrs())
remote_dataset = xr.Dataset.load_store(remote_netcdf)

remote_dataset = xr.Dataset.load_store(remote_netcdf)
geo_lat_min = float(remote_dataset.attrs.get('geospatial_lat_min'))
geo_lon_min = float(remote_dataset.attrs.get('geospatial_lon_min'))

geo_lat_res = float(remote_dataset.attrs.get('geospatial_lon_resolution'))
geo_lon_res = float(remote_dataset.attrs.get('geospatial_lat_resolution'))

if region:
[lat_min, lon_min, lat_max, lon_max] = region.bounds
remote_dataset = remote_dataset.sel(drop=False,
lat=slice(lat_min, lat_max),
lon=slice(lon_min, lon_max))
if not var_names:
var_names = [var_name for var_name in remote_netcdf.variables.keys()]

child_monitor.start(label=file_name, total_work=len(var_names))
for sel_var_name in var_names:
var_dataset = remote_dataset.drop(
[var_name for var_name in remote_dataset.variables.keys() if var_name != sel_var_name])
if compression_enabled:
var_dataset.variables.get(sel_var_name).encoding.update(encoding_update)
local_netcdf.store_dataset(var_dataset)
child_monitor.progress(work=1, msg=sel_var_name)

local_netcdf.sync()
remote_netcdf.close()

local_netcdf.close()
lat_min = floor((lat_min - geo_lat_min) / geo_lat_res)
lat_max = ceil((lat_max - geo_lat_min) / geo_lat_res)
lon_min = floor((lon_min - geo_lon_min) / geo_lon_res)
lon_max = ceil((lon_max - geo_lon_min) / geo_lon_res)

if region:
remote_dataset = remote_dataset.isel(drop=False,
lat=slice(lat_min, lat_max),
lon=slice(lon_min, lon_max))

if not var_names:
var_names = [var_name for var_name in remote_netcdf.variables.keys()]
var_names.extend([coord_name for coord_name in remote_dataset.coords.keys()
if coord_name not in var_names])

child_monitor.start(label=file_name, total_work=len(var_names))
for sel_var_name in var_names:
var_dataset = remote_dataset.drop(
[var_name for var_name in var_names if var_name != sel_var_name])
if compression_enabled:
var_dataset.variables.get(sel_var_name).encoding.update(encoding_update)
local_netcdf.store_dataset(var_dataset)
child_monitor.progress(work=1, msg=sel_var_name)

if region:
local_netcdf.set_attribute('geospatial_lat_min', lat_min * geo_lat_res + geo_lat_min)
local_netcdf.set_attribute('geospatial_lat_max', lat_max * geo_lat_res + geo_lat_min)
local_netcdf.set_attribute('geospatial_lon_min', lon_min * geo_lon_res + geo_lon_min)
local_netcdf.set_attribute('geospatial_lon_max', lon_max * geo_lon_res + geo_lon_min)
except:
raise
finally:
if remote_netcdf:
remote_netcdf.close()
if local_netcdf:
local_netcdf.close()

child_monitor.done()

monitor.done()
Expand Down Expand Up @@ -671,7 +712,7 @@ def _init_file_list(self, monitor: Monitor=Monitor.NONE):
fetch_json_args=[self._master_id, self._dataset_id],
fetch_json_kwargs=dict(monitor=monitor),
cache_used=self._data_store.index_cache_used,
cache_dir=self.local_dataset_dir(),
cache_dir=self.local_metadata_dataset_dir(),
cache_json_filename='file-list.json',
cache_timestamp_filename='file-list-timestamp.txt',
cache_expiration_days=self._data_store.index_cache_expiration_days)
Expand Down

0 comments on commit 1ebb5be

Please sign in to comment.