Skip to content
This repository has been archived by the owner on Aug 29, 2023. It is now read-only.

#269 avoid making a copy of empty datasources #351

Merged
merged 1 commit into from
Sep 7, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 23 additions & 9 deletions cate/ds/esa_cci_odp.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
from cate.conf import get_config_value
from cate.conf.defaults import NETCDF_COMPRESSION_LEVEL
from cate.core.ds import DATA_STORE_REGISTRY, DataStore, DataSource, Schema, \
open_xarray_dataset, get_data_stores_path, find_data_sources
open_xarray_dataset, get_data_stores_path
from cate.core.types import PolygonLike, TimeRange, TimeRangeLike, VarNamesLike, VarNames
from cate.ds.local import add_to_data_store_registry, LocalDataSource
from cate.util.monitor import Monitor
Expand Down Expand Up @@ -591,7 +591,14 @@ def update_local(self,
time_range: TimeRangeLike.TYPE,
monitor: Monitor = Monitor.NONE) -> bool:

data_sources = find_data_sources(id=local_id) # type: Sequence['DataSource']
local_store = DATA_STORE_REGISTRY.get_data_store('local')
if not local_store:
add_to_data_store_registry()
local_store = DATA_STORE_REGISTRY.get_data_store('local')
if not local_store:
raise ValueError('Cannot initialize `local` DataStore')

data_sources = local_store.query(id=local_id) # type: Sequence['DataSource']
data_source = next((ds for ds in data_sources if isinstance(ds, LocalDataSource) and
ds.id == local_id), None) # type: LocalDataSource
if not data_source:
Expand Down Expand Up @@ -621,9 +628,11 @@ def update_local(self,
if to_add:
for time_range_to_add in to_add:
self._make_local(data_source, time_range_to_add, None, data_source.variables_info, monitor)
data_source.meta_info['temporal_coverage_start'] = time_range[0]
data_source.meta_info['temporal_coverage_end'] = time_range[1]
data_source.update_temporal_coverage(time_range)

# TODO (chris): forman added False (?) to make signature happy
return False
return bool(to_remove or to_add)

def delete_local(self, time_range: TimeRangeLike.TYPE) -> int:

Expand Down Expand Up @@ -966,11 +975,16 @@ def make_local(self,

local_ds = local_store.create_data_source(local_name, region, _REFERENCE_DATA_SOURCE_TYPE, self.id,
time_range, var_names, meta_info=local_meta_info, lock_file=True)
self._make_local(local_ds, time_range, region, var_names, monitor=monitor)
if local_ds.is_empty:
local_store.remove_data_source(local_ds)
return None
return local_ds
if local_ds:
if not local_ds.is_complete:
self._make_local(local_ds, time_range, region, var_names, monitor=monitor)

if local_ds.is_empty:
local_store.remove_data_source(local_ds)
return None
local_store.register_ds(local_ds)
return local_ds
return None

def _init_file_list(self, monitor: Monitor = Monitor.NONE):
if self._file_list:
Expand Down
79 changes: 64 additions & 15 deletions cate/ds/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@

from cate.conf import get_config_value
from cate.conf.defaults import NETCDF_COMPRESSION_LEVEL
from cate.core.ds import DATA_STORE_REGISTRY, DataStore, DataSource, open_xarray_dataset, find_data_sources
from cate.core.ds import DATA_STORE_REGISTRY, DataStore, DataSource, open_xarray_dataset
from cate.core.ds import get_data_stores_path
from cate.core.types import PolygonLike, TimeRange, TimeRangeLike, VarNamesLike
from cate.util.monitor import Monitor
Expand Down Expand Up @@ -127,6 +127,7 @@ def __init__(self,
self._reference_name = reference_name

self._meta_info = meta_info if meta_info else OrderedDict()
self._is_complete = True

def _resolve_file_path(self, path) -> Sequence:
return glob(os.path.join(self._data_store.data_store_path, path))
Expand Down Expand Up @@ -351,18 +352,31 @@ def make_local(self,

local_ds = local_store.create_data_source(local_name, region, _REFERENCE_DATA_SOURCE_TYPE, self.id,
meta_info=self.meta_info)
self._make_local(local_ds, time_range, region, var_names, monitor)
if local_ds.is_empty:
local_store.remove_data_source(local_ds)
return None
return local_ds
if local_ds:
if not local_ds.is_complete:
self._make_local(local_ds, time_range, region, var_names, monitor=monitor)

if local_ds.is_empty:
local_store.remove_data_source(local_ds)
return None

local_store.register_ds(local_ds)
return local_ds
return None

def update_local(self,
local_id: str,
time_range: TimeRangeLike.TYPE,
monitor: Monitor = Monitor.NONE) -> bool:

data_sources = find_data_sources(id=local_id) # type: Sequence['DataSource']
local_store = DATA_STORE_REGISTRY.get_data_store('local')
if not local_store:
add_to_data_store_registry()
local_store = DATA_STORE_REGISTRY.get_data_store('local')
if not local_store:
raise ValueError('Cannot initialize `local` DataStore')

data_sources = local_store.query(id=local_id) # type: Sequence['DataSource']
data_source = next((ds for ds in data_sources if isinstance(ds, LocalDataSource) and
ds.id == local_id), None) # type: LocalDataSource
if not data_source:
Expand Down Expand Up @@ -391,6 +405,10 @@ def update_local(self,
if to_add:
for time_range_to_add in to_add:
self._make_local(data_source, time_range_to_add, None, data_source.variables_info, monitor)
data_source.meta_info['temporal_coverage_start'] = time_range[0]
data_source.meta_info['temporal_coverage_end'] = time_range[1]
data_source.update_temporal_coverage(time_range)

return bool(to_remove or to_add)

def add_dataset(self, file, time_coverage: TimeRangeLike.TYPE = None, update: bool = False,
Expand Down Expand Up @@ -424,6 +442,15 @@ def _extend_temporal_coverage(self, time_range: TimeRangeLike.TYPE):
self._temporal_coverage = tuple([time_range[0], self._temporal_coverage[1]])
else:
self._temporal_coverage = time_range
self.save()

def update_temporal_coverage(self, time_range: TimeRangeLike.TYPE):
"""

:param time_range: Time range to be added to data source temporal coverage
:return:
"""
self._extend_temporal_coverage(time_range)

def _reduce_temporal_coverage(self, time_range: TimeRangeLike.TYPE):
"""
Expand Down Expand Up @@ -488,6 +515,14 @@ def variables_info(self):
def info_string(self):
return 'Files: %s' % (' '.join(self._files))

@property
def is_complete(self) -> bool:
"""
Return a DataSource creation state
:return:
"""
return self._is_complete

@property
def is_empty(self) -> bool:
"""
Expand All @@ -496,6 +531,14 @@ def is_empty(self) -> bool:
"""
return not self._files or len(self._files) == 0

def set_completed(self, state: bool):
"""
Sets state of DataSource creation/completion
:param state: Is DataSource completed
:return:
"""
self._is_complete = state

def _repr_html_(self):
import html
return '<table style="border:0;">\n' \
Expand Down Expand Up @@ -579,6 +622,7 @@ def add_pattern(self, data_source_id: str, files: Union[str, Sequence[str]] = No
is_first_file = False
else:
data_source.add_dataset(file)
self.register_ds(data_source)
return data_source

def remove_data_source(self, data_source: Union[str, DataSource], remove_files: bool = True):
Expand All @@ -591,7 +635,12 @@ def remove_data_source(self, data_source: Union[str, DataSource], remove_files:
os.remove(file_name)
if remove_files:
shutil.rmtree(os.path.join(self._store_dir, data_source.id), ignore_errors=True)
self._data_sources.remove(data_source)
if data_source in self._data_sources:
self._data_sources.remove(data_source)

def register_ds(self, data_source: DataSource):
data_source.set_completed(True)
self._data_sources.append(data_source)

def create_data_source(self, data_source_id: str, region: PolygonLike.TYPE = None,
reference_type: str = None, reference_name: str = None,
Expand All @@ -602,7 +651,7 @@ def create_data_source(self, data_source_id: str, region: PolygonLike.TYPE = Non
data_source_id = '%s.%s' % (self.id, data_source_id)
lock_filename = '{}.lock'.format(data_source_id)
lock_filepath = os.path.join(self._store_dir, lock_filename)
existing_ds = None
data_source = None
for ds in self._data_sources:
if ds.id == data_source_id:
if lock_file and os.path.isfile(lock_filepath):
Expand All @@ -614,23 +663,23 @@ def create_data_source(self, data_source_id: str, region: PolygonLike.TYPE = Non
# ds.temporal_coverage() == time_range and
if ds.spatial_coverage() == region \
and ds.variables_info == var_names:
existing_ds = ds
data_source = ds
data_source.set_completed(False)
break
raise ValueError("Local data store '{}' already contains a data source named '{}'"
.format(self.id, data_source_id))
if existing_ds:
data_source = existing_ds
else:
if not data_source:
data_source = LocalDataSource(data_source_id, files=[], data_store=self, spatial_coverage=region,
reference_type=reference_type, reference_name=reference_name,
meta_info=meta_info)
data_source.set_completed(False)
self._save_data_source(data_source)

if lock_file:
pid = os.getpid()
with open(lock_filepath, 'w') as lock_file:
lock_file.write(str(pid))

self._save_data_source(data_source)
self._data_sources.append(data_source)
return data_source

@property
Expand Down