From ed94ac7f0f2a57aea774ef6553b40d0437ba7a7b Mon Sep 17 00:00:00 2001 From: "Krzysztof (Chris) Bernat" Date: Thu, 7 Sep 2017 14:54:01 +0100 Subject: [PATCH] 269 avoid making a copy of empty datasources --- cate/ds/esa_cci_odp.py | 32 ++++++++++++----- cate/ds/local.py | 79 ++++++++++++++++++++++++++++++++++-------- 2 files changed, 87 insertions(+), 24 deletions(-) diff --git a/cate/ds/esa_cci_odp.py b/cate/ds/esa_cci_odp.py index 49a97890a..98bed36da 100644 --- a/cate/ds/esa_cci_odp.py +++ b/cate/ds/esa_cci_odp.py @@ -58,7 +58,7 @@ from cate.conf import get_config_value from cate.conf.defaults import NETCDF_COMPRESSION_LEVEL from cate.core.ds import DATA_STORE_REGISTRY, DataStore, DataSource, Schema, \ - open_xarray_dataset, get_data_stores_path, find_data_sources + open_xarray_dataset, get_data_stores_path from cate.core.types import PolygonLike, TimeRange, TimeRangeLike, VarNamesLike, VarNames from cate.ds.local import add_to_data_store_registry, LocalDataSource from cate.util.monitor import Monitor @@ -591,7 +591,14 @@ def update_local(self, time_range: TimeRangeLike.TYPE, monitor: Monitor = Monitor.NONE) -> bool: - data_sources = find_data_sources(id=local_id) # type: Sequence['DataSource'] + local_store = DATA_STORE_REGISTRY.get_data_store('local') + if not local_store: + add_to_data_store_registry() + local_store = DATA_STORE_REGISTRY.get_data_store('local') + if not local_store: + raise ValueError('Cannot initialize `local` DataStore') + + data_sources = local_store.query(id=local_id) # type: Sequence['DataSource'] data_source = next((ds for ds in data_sources if isinstance(ds, LocalDataSource) and ds.id == local_id), None) # type: LocalDataSource if not data_source: @@ -621,9 +628,11 @@ def update_local(self, if to_add: for time_range_to_add in to_add: self._make_local(data_source, time_range_to_add, None, data_source.variables_info, monitor) + data_source.meta_info['temporal_coverage_start'] = time_range[0] + data_source.meta_info['temporal_coverage_end'] = time_range[1] + data_source.update_temporal_coverage(time_range) - # TODO (chris): forman added False (?) to make signature happy - return False + return bool(to_remove or to_add) def delete_local(self, time_range: TimeRangeLike.TYPE) -> int: @@ -966,11 +975,16 @@ def make_local(self, local_ds = local_store.create_data_source(local_name, region, _REFERENCE_DATA_SOURCE_TYPE, self.id, time_range, var_names, meta_info=local_meta_info, lock_file=True) - self._make_local(local_ds, time_range, region, var_names, monitor=monitor) - if local_ds.is_empty: - local_store.remove_data_source(local_ds) - return None - return local_ds + if local_ds: + if not local_ds.is_complete: + self._make_local(local_ds, time_range, region, var_names, monitor=monitor) + + if local_ds.is_empty: + local_store.remove_data_source(local_ds) + return None + local_store.register_ds(local_ds) + return local_ds + return None def _init_file_list(self, monitor: Monitor = Monitor.NONE): if self._file_list: diff --git a/cate/ds/local.py b/cate/ds/local.py index d730b1092..75b4409d1 100644 --- a/cate/ds/local.py +++ b/cate/ds/local.py @@ -52,7 +52,7 @@ from cate.conf import get_config_value from cate.conf.defaults import NETCDF_COMPRESSION_LEVEL -from cate.core.ds import DATA_STORE_REGISTRY, DataStore, DataSource, open_xarray_dataset, find_data_sources +from cate.core.ds import DATA_STORE_REGISTRY, DataStore, DataSource, open_xarray_dataset from cate.core.ds import get_data_stores_path from cate.core.types import PolygonLike, TimeRange, TimeRangeLike, VarNamesLike from cate.util.monitor import Monitor @@ -127,6 +127,7 @@ def __init__(self, self._reference_name = reference_name self._meta_info = meta_info if meta_info else OrderedDict() + self._is_complete = True def _resolve_file_path(self, path) -> Sequence: return glob(os.path.join(self._data_store.data_store_path, path)) @@ -351,18 +352,31 @@ def make_local(self, local_ds = local_store.create_data_source(local_name, region, _REFERENCE_DATA_SOURCE_TYPE, self.id, meta_info=self.meta_info) - self._make_local(local_ds, time_range, region, var_names, monitor) - if local_ds.is_empty: - local_store.remove_data_source(local_ds) - return None - return local_ds + if local_ds: + if not local_ds.is_complete: + self._make_local(local_ds, time_range, region, var_names, monitor=monitor) + + if local_ds.is_empty: + local_store.remove_data_source(local_ds) + return None + + local_store.register_ds(local_ds) + return local_ds + return None def update_local(self, local_id: str, time_range: TimeRangeLike.TYPE, monitor: Monitor = Monitor.NONE) -> bool: - data_sources = find_data_sources(id=local_id) # type: Sequence['DataSource'] + local_store = DATA_STORE_REGISTRY.get_data_store('local') + if not local_store: + add_to_data_store_registry() + local_store = DATA_STORE_REGISTRY.get_data_store('local') + if not local_store: + raise ValueError('Cannot initialize `local` DataStore') + + data_sources = local_store.query(id=local_id) # type: Sequence['DataSource'] data_source = next((ds for ds in data_sources if isinstance(ds, LocalDataSource) and ds.id == local_id), None) # type: LocalDataSource if not data_source: @@ -391,6 +405,10 @@ def update_local(self, if to_add: for time_range_to_add in to_add: self._make_local(data_source, time_range_to_add, None, data_source.variables_info, monitor) + data_source.meta_info['temporal_coverage_start'] = time_range[0] + data_source.meta_info['temporal_coverage_end'] = time_range[1] + data_source.update_temporal_coverage(time_range) + return bool(to_remove or to_add) def add_dataset(self, file, time_coverage: TimeRangeLike.TYPE = None, update: bool = False, @@ -424,6 +442,15 @@ def _extend_temporal_coverage(self, time_range: TimeRangeLike.TYPE): self._temporal_coverage = tuple([time_range[0], self._temporal_coverage[1]]) else: self._temporal_coverage = time_range + self.save() + + def update_temporal_coverage(self, time_range: TimeRangeLike.TYPE): + """ + + :param time_range: Time range to be added to data source temporal coverage + :return: + """ + self._extend_temporal_coverage(time_range) def _reduce_temporal_coverage(self, time_range: TimeRangeLike.TYPE): """ @@ -488,6 +515,14 @@ def variables_info(self): def info_string(self): return 'Files: %s' % (' '.join(self._files)) + @property + def is_complete(self) -> bool: + """ + Return a DataSource creation state + :return: + """ + return self._is_complete + @property def is_empty(self) -> bool: """ @@ -496,6 +531,14 @@ def is_empty(self) -> bool: """ return not self._files or len(self._files) == 0 + def set_completed(self, state: bool): + """ + Sets state of DataSource creation/completion + :param state: Is DataSource completed + :return: + """ + self._is_complete = state + def _repr_html_(self): import html return '\n' \ @@ -579,6 +622,7 @@ def add_pattern(self, data_source_id: str, files: Union[str, Sequence[str]] = No is_first_file = False else: data_source.add_dataset(file) + self.register_ds(data_source) return data_source def remove_data_source(self, data_source: Union[str, DataSource], remove_files: bool = True): @@ -591,7 +635,12 @@ def remove_data_source(self, data_source: Union[str, DataSource], remove_files: os.remove(file_name) if remove_files: shutil.rmtree(os.path.join(self._store_dir, data_source.id), ignore_errors=True) - self._data_sources.remove(data_source) + if data_source in self._data_sources: + self._data_sources.remove(data_source) + + def register_ds(self, data_source: DataSource): + data_source.set_completed(True) + self._data_sources.append(data_source) def create_data_source(self, data_source_id: str, region: PolygonLike.TYPE = None, reference_type: str = None, reference_name: str = None, @@ -602,7 +651,7 @@ def create_data_source(self, data_source_id: str, region: PolygonLike.TYPE = Non data_source_id = '%s.%s' % (self.id, data_source_id) lock_filename = '{}.lock'.format(data_source_id) lock_filepath = os.path.join(self._store_dir, lock_filename) - existing_ds = None + data_source = None for ds in self._data_sources: if ds.id == data_source_id: if lock_file and os.path.isfile(lock_filepath): @@ -614,23 +663,23 @@ def create_data_source(self, data_source_id: str, region: PolygonLike.TYPE = Non # ds.temporal_coverage() == time_range and if ds.spatial_coverage() == region \ and ds.variables_info == var_names: - existing_ds = ds + data_source = ds + data_source.set_completed(False) break raise ValueError("Local data store '{}' already contains a data source named '{}'" .format(self.id, data_source_id)) - if existing_ds: - data_source = existing_ds - else: + if not data_source: data_source = LocalDataSource(data_source_id, files=[], data_store=self, spatial_coverage=region, reference_type=reference_type, reference_name=reference_name, meta_info=meta_info) + data_source.set_completed(False) + self._save_data_source(data_source) + if lock_file: pid = os.getpid() with open(lock_filepath, 'w') as lock_file: lock_file.write(str(pid)) - self._save_data_source(data_source) - self._data_sources.append(data_source) return data_source @property