diff --git a/CHANGES.md b/CHANGES.md
index efa32a085..df211e32f 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -44,6 +44,16 @@
### Issues Fixed/Resolved
+* Fixed [#299](https://github.com/CCI-Tools/cate-core/issues/299)
+ * renamed property `cate.core.ds.DataSource.name` to `id`
+ * renamed property `cate.core.ds.DataStore.name` to `id`
+ * renamed and changed signature of function `cate.core.ds.DataStore.query_data_sources(..., name=None)`
+ to `find_data_sources(..., id=None, query_expr=None)`
+ * changed signature of method `cate.core.ds.DataStore.query(name, ...)` to `query(id=None, query_expr=None, ...)`
+ * renamed and changed signature of method `cate.core.ds.DataSource.matches_filter(name)` to `matches(id=None, query_expr=None)`
+ * added `title` property to `cate.core.ds.DataStore` and `cate.core.ds.DataSource`
+ * made use of the new `id` and `title` properties of both `DataStore` and `DataSource` in their
+ JSON representations.
* Fixed [#294](https://github.com/CCI-Tools/cate-core/issues/294)
* Fixed [#286](https://github.com/CCI-Tools/cate-core/issues/286)
* Fixed [#285](https://github.com/CCI-Tools/cate-core/issues/285)
diff --git a/cate/cli/main.py b/cate/cli/main.py
index d108d0e6f..0d64f7425 100644
--- a/cate/cli/main.py
+++ b/cate/cli/main.py
@@ -19,9 +19,6 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
-__author__ = "Norman Fomferra (Brockmann Consult GmbH), " \
- "Marco Zühlke (Brockmann Consult GmbH)"
-
"""
Description
===========
@@ -108,7 +105,7 @@
from cate.conf.defaults import WEBAPI_INFO_FILE, WEBAPI_ON_INACTIVITY_AUTO_STOP_AFTER
from cate.core.types import Like, TimeRangeLike
-from cate.core.ds import DATA_STORE_REGISTRY, query_data_sources
+from cate.core.ds import DATA_STORE_REGISTRY, find_data_sources
from cate.core.objectio import OBJECT_IO_REGISTRY, find_writer, read_object
from cate.core.op import OP_REGISTRY
from cate.core.plugin import PLUGIN_REGISTRY
@@ -123,6 +120,8 @@
from cate.webapi.wsmanag import WebAPIWorkspaceManager
from cate.version import __version__
+__author__ = "Norman Fomferra (Brockmann Consult GmbH), " \
+ "Marco Zühlke (Brockmann Consult GmbH)"
#: Name of the Cate CLI executable (= ``cate``).
CLI_NAME = 'cate'
@@ -287,18 +286,23 @@ def _parse_op_args(raw_args: List[str],
else:
# For any non-None value and any data type we perform basic type validation:
if value is not None and data_type:
+ # noinspection PyTypeChecker
if issubclass(data_type, Like):
- # For XXXLike-types call accepts()
+ # noinspection PyUnresolvedReferences
compatible = data_type.accepts(value)
else:
+ # noinspection PyTypeChecker
compatible = isinstance(value, data_type)
if not compatible:
+ # noinspection PyTypeChecker
if issubclass(data_type, float):
# Allow assigning bool and int to a float
compatible = isinstance(value, bool) or isinstance(value, int)
+ # noinspection PyTypeChecker
elif issubclass(data_type, int):
# Allow assigning bool and float to an int
compatible = isinstance(value, bool) or isinstance(value, float)
+ # noinspection PyTypeChecker
elif issubclass(data_type, bool):
# Allow assigning anything to a bool
compatible = True
@@ -396,7 +400,8 @@ def _get_op_info_str(op_meta_info: OpMetaInfo):
op_info_str += '\n'
op_info_str += _get_op_io_info_str(op_meta_info.inputs, 'Input', 'Inputs', 'Operation does not have any inputs.')
- op_info_str += _get_op_io_info_str(op_meta_info.outputs, 'Output', 'Outputs', 'Operation does not have any outputs.')
+ op_info_str += _get_op_io_info_str(op_meta_info.outputs, 'Output', 'Outputs',
+ 'Operation does not have any outputs.')
return op_info_str
@@ -558,12 +563,12 @@ def execute(self, command_args):
OP_ARGS_RES_HELP = 'Operation arguments given as KEY=VALUE. KEY is any supported input by OP. VALUE ' \
- 'depends on the expected data type of an OP input. It can be either a value or ' \
- 'a reference an existing resource prefixed by the add character "@". ' \
- 'The latter connects to operation steps with each other. To provide a (constant)' \
- 'value you can use boolean literals True and False, strings, or numeric values. ' \
- 'Type "cate op info OP" to print information about the supported OP ' \
- 'input names to be used as KEY and their data types to be used as VALUE. '
+ 'depends on the expected data type of an OP input. It can be either a value or ' \
+ 'a reference an existing resource prefixed by the add character "@". ' \
+ 'The latter connects to operation steps with each other. To provide a (constant)' \
+ 'value you can use boolean literals True and False, strings, or numeric values. ' \
+ 'Type "cate op info OP" to print information about the supported OP ' \
+ 'input names to be used as KEY and their data types to be used as VALUE. '
class WorkspaceCommand(SubCommandCommand):
@@ -920,10 +925,13 @@ def _execute_open(cls, command_args):
workspace_manager = _new_workspace_manager()
op_args = dict(ds_name=command_args.ds_name)
if command_args.var_names:
+ # noinspection PyArgumentList
op_args.update(var_names=command_args.var_names)
if command_args.region:
+ # noinspection PyArgumentList
op_args.update(region=command_args.region)
if command_args.start_date or command_args.end_date:
+ # noinspection PyArgumentList
op_args.update(time_range="%s,%s" % (command_args.start_date or '',
command_args.end_date or ''))
workspace_manager.set_workspace_resource(_base_dir(command_args.base_dir),
@@ -937,6 +945,7 @@ def _execute_read(cls, command_args):
workspace_manager = _new_workspace_manager()
op_args = dict(file=command_args.file_path)
if command_args.format_name:
+ # noinspection PyArgumentList
op_args.update(format=command_args.format_name)
workspace_manager.set_workspace_resource(_base_dir(command_args.base_dir),
command_args.res_name,
@@ -1154,23 +1163,23 @@ def configure_parser_and_subparsers(cls, parser, subparsers):
def _execute_list(cls, command_args):
ds_name = command_args.name
if command_args.coverage:
- ds_names = OrderedDict(sorted(((ds.name, TimeRangeLike.format(ds.temporal_coverage())
+ ds_names = OrderedDict(sorted(((ds.id, TimeRangeLike.format(ds.temporal_coverage())
if ds.temporal_coverage() else None)
- for ds in query_data_sources()),
+ for ds in find_data_sources()),
key=lambda item: item[0]))
else:
- ds_names = sorted(data_source.name for data_source in query_data_sources())
+ ds_names = sorted(data_source.id for data_source in find_data_sources())
_list_items('data source', 'data sources', ds_names, ds_name)
@classmethod
def _execute_info(cls, command_args):
ds_name = command_args.ds_name
- data_sources = [data_source for data_source in query_data_sources(name=ds_name) if data_source.name == ds_name]
+ data_sources = [data_source for data_source in find_data_sources(id=ds_name) if data_source.id == ds_name]
if not data_sources:
raise CommandError('data source "%s" not found' % ds_name)
data_source = data_sources[0]
- title = 'Data source %s' % data_source.name
+ title = 'Data source %s' % data_source.id
print()
print(title)
print('=' * len(title))
@@ -1197,7 +1206,7 @@ def _execute_add(cls, command_args):
ds_name = command_args.ds_name
files = command_args.file
ds = local_store.add_pattern(ds_name, files)
- print("Local data source with name '%s' added." % ds.name)
+ print("Local data source with name '%s' added." % ds.id)
@classmethod
def _execute_del(cls, command_args):
@@ -1213,7 +1222,7 @@ def _execute_del(cls, command_args):
if not answer or answer.lower() == 'y':
keep_files = command_args.keep_files
ds = local_store.remove_data_source(ds_name, not keep_files)
- print("Local data source with name '%s' removed." % ds.name)
+ print("Local data source with name '%s' removed." % ds.id)
@classmethod
def _execute_copy(cls, command_args):
@@ -1222,7 +1231,7 @@ def _execute_copy(cls, command_args):
raise RuntimeError('internal error: no local data store found')
ds_name = command_args.ref_ds
- data_source = next(iter(query_data_sources(None, ds_name)), None)
+ data_source = next(iter(find_data_sources(None, id=ds_name)), None)
if data_source is None:
raise RuntimeError('internal error: no local data source found: %s' % ds_name)
@@ -1234,7 +1243,7 @@ def _execute_copy(cls, command_args):
ds = data_source.make_local(local_name, None, time_range=time_range, region=region, var_names=var_names,
monitor=cls.new_monitor())
- print("Local data source with name '%s' has been created." % ds.name)
+ print("Local data source with name '%s' has been created." % ds.id)
class PluginCommand(SubCommandCommand):
@@ -1295,11 +1304,13 @@ def _trim_error_message(message: str) -> str:
# use by 'sphinxarg' to generate the documentation
def _make_cate_parser():
from cate.util.cli import _make_parser
+ # noinspection PyTypeChecker
return _make_parser(CLI_NAME, CLI_DESCRIPTION, __version__, COMMAND_REGISTRY, license_text=_LICENSE,
docs_url=_DOCS_URL)
def main(args=None) -> int:
+ # noinspection PyTypeChecker
return run_main(CLI_NAME,
CLI_DESCRIPTION,
__version__,
diff --git a/cate/core/__init__.py b/cate/core/__init__.py
index 1f64dee98..9a122fe8f 100644
--- a/cate/core/__init__.py
+++ b/cate/core/__init__.py
@@ -24,7 +24,7 @@
"""
# noinspection PyUnresolvedReferences
-from .ds import DataStore, DataSource, open_dataset, query_data_sources, DATA_STORE_REGISTRY
+from .ds import DataStore, DataSource, open_dataset, find_data_sources, DATA_STORE_REGISTRY
# noinspection PyUnresolvedReferences
from .op import op, op_input, op_output, op_return, Operation, OP_REGISTRY, \
diff --git a/cate/core/ds.py b/cate/core/ds.py
index c20c162e0..a651aa07f 100644
--- a/cate/core/ds.py
+++ b/cate/core/ds.py
@@ -92,7 +92,6 @@
from .types import PolygonLike, TimeRange, TimeRangeLike, VarNamesLike
from ..util import Monitor
-
__author__ = "Norman Fomferra (Brockmann Consult GmbH), " \
"Marco Zühlke (Brockmann Consult GmbH), " \
"Chris Bernat (Telespazio VEGA UK Ltd)"
@@ -116,8 +115,8 @@ class DataSource(metaclass=ABCMeta):
@property
@abstractmethod
- def name(self) -> str:
- """Human-readable data source name."""
+ def id(self) -> str:
+ """Data source identifier."""
@property
def schema(self) -> Optional[Schema]:
@@ -146,9 +145,18 @@ def protocols(self) -> []:
def data_store(self) -> 'DataStore':
"""The data store to which this data source belongs."""
- def matches_filter(self, name=None) -> bool:
- """Test if this data source matches the given *constraints*."""
- if name and name.lower() not in self.name.lower():
+ def matches(self, id: str = None, query_expr: str = None) -> bool:
+ """
+ Test if this data source matches the given *id* or *query_expr*.
+ If neither *id* nor *query_expr* are given, the method returns True.
+
+ :param id: A data source identifier.
+ :param query_expr: A query expression. Currently, only simple search strings are supported.
+ :return: True, if this data sources matches the given *id* or *query_expr*.
+ """
+ if query_expr:
+ raise NotImplementedError('query_expr not yet supported')
+ if id and id.lower() not in self.id.lower():
return False
return True
@@ -258,7 +266,16 @@ def delete_local(self,
return 0
@property
- def meta_info(self) -> Union[dict, None]:
+ def title(self) -> Optional[str]:
+ """
+ Human-readable data source title.
+ The default implementation tries to retrieve the title from ``meta_info['title']``.
+ """
+ meta_info = self.meta_info
+ return meta_info and meta_info.get('title')
+
+ @property
+ def meta_info(self) -> Optional[dict]:
"""
Return meta-information about this data source.
The returned dict, if any, is JSON-serializable.
@@ -266,7 +283,7 @@ def meta_info(self) -> Union[dict, None]:
return None
@property
- def cache_info(self) -> Union[dict, None]:
+ def cache_info(self) -> Optional[dict]:
"""
Return information about cached, locally available data sets.
The returned dict, if any, is JSON-serializable.
@@ -274,7 +291,7 @@ def cache_info(self) -> Union[dict, None]:
return None
@property
- def variables_info(self) -> Union[dict, None]:
+ def variables_info(self) -> Optional[dict]:
"""
Return meta-information about the variables contained in this data source.
The returned dict, if any, is JSON-serializable.
@@ -282,7 +299,7 @@ def variables_info(self) -> Union[dict, None]:
return None
@property
- def info_string(self):
+ def info_string(self) -> str:
"""
Return a textual representation of the meta-information about this data source.
Useful for CLI / REPL applications.
@@ -303,8 +320,9 @@ def info_string(self):
return '\n'.join(info_lines)
+ # TODO (forman): No overrides! Remove from DataSource interface, turn into utility function instead
@property
- def variables_info_string(self):
+ def variables_info_string(self) -> str:
"""
Return some textual information about the variables contained in this data source.
Useful for CLI / REPL applications.
@@ -323,8 +341,9 @@ def variables_info_string(self):
return '\n'.join(info_lines)
+ # TODO (forman): No overrides! Remove from DataSource interface, turn into utility function instead
@property
- def cached_datasets_coverage_string(self):
+ def cached_datasets_coverage_string(self) -> str:
"""
Return a textual representation of information about cached, locally available data sets.
Useful for CLI / REPL applications.
@@ -350,17 +369,30 @@ def _repr_html_(self):
class DataStore(metaclass=ABCMeta):
- """Represents a data store of data sources."""
+ """
+ Represents a data store of data sources.
+
+ :param id: Unique data store identifier.
+ :param title: A human-readable tile.
+ """
+
+ def __init__(self, id: str, title: str = None):
+ self._id = id
+ self._title = title or id
- def __init__(self, name: str):
- self._name = name
+ @property
+ def id(self) -> str:
+ """
+ Return the unique identifier for this data store.
+ """
+ return self._id
@property
- def name(self) -> str:
+ def title(self) -> str:
"""
- Return the name of this data store.
+ Return a human-readable tile for this data store.
"""
- return self._name
+ return self._title
@property
def data_store_path(self) -> Optional[str]:
@@ -370,11 +402,12 @@ def data_store_path(self) -> Optional[str]:
return None
@abstractmethod
- def query(self, name=None, monitor: Monitor = Monitor.NONE) -> Sequence[DataSource]:
+ def query(self, id: str = None, query_expr: str = None, monitor: Monitor = Monitor.NONE) -> Sequence[DataSource]:
"""
Retrieve data sources in this data store using the given constraints.
- :param name: Name of the data source.
+ :param id: Data source identifier.
+ :param query_expr: Query expression which may be used if *ìd* is unknown.
:param monitor: A progress monitor.
:return: Sequence of data sources.
"""
@@ -403,17 +436,17 @@ class DataStoreRegistry:
def __init__(self):
self._data_stores = dict()
- def get_data_store(self, name: str) -> Optional[DataStore]:
- return self._data_stores.get(name, None)
+ def get_data_store(self, id: str) -> Optional[DataStore]:
+ return self._data_stores.get(id)
def get_data_stores(self) -> Sequence[DataStore]:
return list(self._data_stores.values())
def add_data_store(self, data_store: DataStore):
- self._data_stores[data_store.name] = data_store
+ self._data_stores[data_store.id] = data_store
- def remove_data_store(self, name: str):
- del self._data_stores[name]
+ def remove_data_store(self, id: str):
+ del self._data_stores[id]
def __len__(self):
return len(self._data_stores)
@@ -427,8 +460,8 @@ def __repr__(self):
def _repr_html_(self):
rows = []
- for name, data_store in self._data_stores.items():
- rows.append('
%s
%s
' % (name, repr(data_store)))
+ for id, data_store in self._data_stores.items():
+ rows.append('
%s
%s
' % (id, repr(data_store)))
return '
%s
' % '\n'.join(rows)
@@ -437,13 +470,17 @@ def _repr_html_(self):
DATA_STORE_REGISTRY = DataStoreRegistry()
-def query_data_sources(data_stores: Union[DataStore, Sequence[DataStore]] = None, name=None) -> Sequence[DataSource]:
- """Query the data store(s) for data sources matching the given constrains.
+def find_data_sources(data_stores: Union[DataStore, Sequence[DataStore]] = None,
+ id: str = None,
+ query_expr: str = None) -> Sequence[DataSource]:
+ """
+ Find data sources in the given data store(s) matching the given *id* or *query_expr*.
See also :py:func:`open_dataset`.
:param data_stores: If given these data stores will be queried. Otherwise all registered data stores will be used.
- :param name: The name of a data source.
+ :param id: A data source identifier.
+ :param query_expr: A query expression.
:return: All data sources matching the given constrains.
"""
results = []
@@ -455,21 +492,21 @@ def query_data_sources(data_stores: Union[DataStore, Sequence[DataStore]] = None
primary_data_store = data_stores
else:
data_store_list = data_stores
- if not primary_data_store and name and name.count('.') > 0:
+ if not primary_data_store and id and id.count('.') > 0:
primary_data_store_index = -1
- primary_data_store_name, data_source_name = name.split('.', 1)
+ primary_data_store_id, data_source_name = id.split('.', 1)
for idx, data_store in enumerate(data_store_list):
- if data_store.name == primary_data_store_name:
+ if data_store.id == primary_data_store_id:
primary_data_store_index = idx
if primary_data_store_index >= 0:
primary_data_store = data_store_list.pop(primary_data_store_index)
if primary_data_store:
- results.extend(primary_data_store.query(name))
+ results.extend(primary_data_store.query(id=id, query_expr=query_expr))
if not results:
# noinspection PyTypeChecker
for data_store in data_store_list:
- results.extend(data_store.query(name))
+ results.extend(data_store.query(id=id, query_expr=query_expr))
return results
@@ -494,7 +531,7 @@ def open_dataset(data_source: Union[DataSource, str],
if isinstance(data_source, str):
data_store_list = list(DATA_STORE_REGISTRY.get_data_stores())
- data_sources = query_data_sources(data_store_list, name=data_source)
+ data_sources = find_data_sources(data_store_list, id=data_source)
if len(data_sources) == 0:
raise ValueError("No data_source found for the given query term", data_source)
elif len(data_sources) > 1:
diff --git a/cate/ds/esa_cci_ftp.py b/cate/ds/esa_cci_ftp.py
index cfd3ba0f5..139459e5f 100644
--- a/cate/ds/esa_cci_ftp.py
+++ b/cate/ds/esa_cci_ftp.py
@@ -20,10 +20,6 @@
# SOFTWARE.
-__author__ = "Norman Fomferra (Brockmann Consult GmbH), " \
- "Marco Zühlke (Brockmann Consult GmbH), " \
- "Chris Bernat (Telespacio VEGA UK Inc.)"
-
"""
Description
===========
@@ -61,6 +57,10 @@
from cate.core.types import PolygonLike, TimeRangeLike, VarNamesLike
from cate.util import to_datetime, Monitor, Cancellation
+__author__ = "Norman Fomferra (Brockmann Consult GmbH), " \
+ "Marco Zühlke (Brockmann Consult GmbH), " \
+ "Chris Bernat (Telespacio VEGA UK Inc.)"
+
Time = Union[str, datetime]
TimeRange = Tuple[Time, Time]
@@ -85,7 +85,7 @@ class FileSetDataSource(DataSource):
Parameters
----------
- name : str
+ id : str
The name of the file set
base_dir : str
The base directory
@@ -101,19 +101,19 @@ class FileSetDataSource(DataSource):
def __init__(self,
file_set_data_store: 'FileSetDataStore',
- name: str,
+ id: str,
base_dir: str,
file_pattern: str,
fileset_info: 'FileSetInfo' = None):
self._file_set_data_store = file_set_data_store
- self._name = name
+ self._id = id
self._base_dir = base_dir
self._file_pattern = file_pattern
self._fileset_info = fileset_info
@property
- def name(self):
- return self._name
+ def id(self):
+ return self._id
@property
def schema(self) -> Schema:
@@ -153,7 +153,7 @@ def to_json_dict(self):
:return: A JSON-serializable dictionary
"""
fsds_dict = OrderedDict()
- fsds_dict['name'] = self.name
+ fsds_dict['name'] = self.id
fsds_dict['base_dir'] = self._base_dir
fsds_dict['file_pattern'] = self._file_pattern
if self._fileset_info:
@@ -234,7 +234,7 @@ def sync(self,
num_of_synchronised_files = 0
num_of_expected_remote_files = len(list(chain.from_iterable(list(expected_remote_files.values()))))
- with monitor.starting('Sync %s' % self._name, num_of_expected_remote_files):
+ with monitor.starting('Sync %s' % self._id, num_of_expected_remote_files):
try:
with ftplib.FTP(ftp_host_name) as ftp:
ftp.login()
@@ -318,7 +318,7 @@ def _get_expected_remote_files(self, time_range: TimeRange = (None, None)) -> Ma
return expected_remote_files
def __repr__(self):
- return "FileSetDataSource(%s, %s, %s)" % (repr(self._name), repr(self._base_dir), repr(self._file_pattern))
+ return "FileSetDataSource(%s, %s, %s)" % (repr(self._id), repr(self._base_dir), repr(self._file_pattern))
@property
def info_string(self):
@@ -338,7 +338,7 @@ def _repr_html_(self):
return '
%s
' % rows
def get_table_data(self):
- return OrderedDict([('Name', self._name),
+ return OrderedDict([('Name', self._id),
('Base directory', self._base_dir),
('File pattern', self._file_pattern)])
@@ -524,8 +524,8 @@ class FileSetDataStore(DataStore):
:param remote_url: Optional URL of the data store's remote service.
"""
- def __init__(self, name: str, root_dir: str, remote_url: str = None):
- super().__init__(name)
+ def __init__(self, id: str, root_dir: str, remote_url: str = None):
+ super().__init__(id)
self._root_dir = root_dir
self._remote_url = remote_url
self._data_sources = []
@@ -544,8 +544,10 @@ def remote_url(self) -> str:
"""Optional URL of the data store's remote service."""
return self._remote_url
- def query(self, name=None, monitor: Monitor = Monitor.NONE) -> Sequence[DataSource]:
- return [ds for ds in self._data_sources if ds.matches_filter(name)]
+ def query(self, id: str = None, query_expr: str = None, monitor: Monitor = Monitor.NONE) -> Sequence[DataSource]:
+ if id or query_expr:
+ return [ds for ds in self._data_sources if ds.matches(id=id, query_expr=query_expr)]
+ return self._data_sources
def load_from_json(self, json_fp_or_str: Union[str, IOBase]):
if isinstance(json_fp_or_str, str):
diff --git a/cate/ds/esa_cci_odp.py b/cate/ds/esa_cci_odp.py
index cf793430e..da0645163 100644
--- a/cate/ds/esa_cci_odp.py
+++ b/cate/ds/esa_cci_odp.py
@@ -18,9 +18,6 @@
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
-__author__ = "Norman Fomferra (Brockmann Consult GmbH), " \
- "Marco Zühlke (Brockmann Consult GmbH), " \
- "Chris Bernat (Telespazio VEGA UK Ltd)"
"""
Description
@@ -59,11 +56,15 @@
from cate.conf import get_config_value
from cate.conf.defaults import NETCDF_COMPRESSION_LEVEL
from cate.core.ds import DATA_STORE_REGISTRY, DataStore, DataSource, Schema, \
- open_xarray_dataset, get_data_stores_path, query_data_sources
+ open_xarray_dataset, get_data_stores_path, find_data_sources
from cate.core.types import PolygonLike, TimeRange, TimeRangeLike, VarNamesLike
from cate.ds.local import add_to_data_store_registry, LocalDataSource
from cate.util.monitor import Monitor
+__author__ = "Norman Fomferra (Brockmann Consult GmbH), " \
+ "Marco Zühlke (Brockmann Consult GmbH), " \
+ "Chris Bernat (Telespazio VEGA UK Ltd)"
+
_ESGF_CEDA_URL = "https://esgf-index1.ceda.ac.uk/esg-search/search/"
_CSW_CEDA_URL = "http://csw1.cems.rl.ac.uk/geonetwork-CEDA/srv/eng/csw-CEDA-CCI"
@@ -141,6 +142,7 @@ def _fetch_solr_json(base_url, query_args, offset=0, limit=3500, timeout=10, mon
while True:
monitor.progress(work=1)
paging_query_args = dict(query_args or {})
+ # noinspection PyArgumentList
paging_query_args.update(offset=offset, limit=limit, format='application/solr+json')
url = base_url + '?' + urllib.parse.urlencode(paging_query_args)
with urllib.request.urlopen(url, timeout=timeout) as response:
@@ -205,6 +207,7 @@ def _load_or_fetch_json(fetch_json_function,
if json_obj is None:
# noinspection PyArgumentList
try:
+ # noinspection PyArgumentList
json_obj = fetch_json_function(*(fetch_json_args or []), **(fetch_json_kwargs or {}))
if cache_used:
os.makedirs(cache_dir, exist_ok=True)
@@ -277,11 +280,12 @@ def pick_start_time(file_info_rec):
class EsaCciOdpDataStore(DataStore):
def __init__(self,
- name: str = 'esa_cci_odp',
+ id: str = 'esa_cci_odp',
+ title: str = 'ESA CCI Open Data Portal',
index_cache_used: bool = True,
index_cache_expiration_days: float = 1.0,
index_cache_json_dict: dict = None):
- super().__init__(name)
+ super().__init__(id, title=title)
self._index_cache_used = index_cache_used
self._index_cache_expiration_days = index_cache_expiration_days
self._index_json_dict = index_cache_json_dict
@@ -313,13 +317,11 @@ def update_indices(self, update_file_lists: bool = False, monitor: Monitor = Mon
data_source.update_file_list()
child_monitor.progress(work=1)
- def query(self, name: str = None, monitor: Monitor = Monitor.NONE) -> Sequence['DataSource']:
+ def query(self, id: str = None, query_expr: str = None, monitor: Monitor = Monitor.NONE) -> Sequence['DataSource']:
self._init_data_sources()
- if name:
- result = [data_source for data_source in self._data_sources if data_source.matches_filter(name)]
- else:
- result = self._data_sources
- return result
+ if id or query_expr:
+ return [ds for ds in self._data_sources if ds.matches(id=id, query_expr=query_expr)]
+ return self._data_sources
def _repr_html_(self) -> str:
self._init_data_sources()
@@ -440,7 +442,7 @@ def __init__(self,
self._meta_info = None
@property
- def name(self) -> str:
+ def id(self) -> str:
return self._master_id
@property
@@ -456,7 +458,6 @@ def spatial_coverage(self) -> Optional[PolygonLike]:
if self._catalogue_data \
and self._catalogue_data.get('bbox_minx', None) and self._catalogue_data.get('bbox_miny', None) \
and self._catalogue_data.get('bbox_maxx', None) and self._catalogue_data.get('bbox_maxy', None):
-
return PolygonLike.convert([
self._catalogue_data.get('bbox_minx'),
self._catalogue_data.get('bbox_miny'),
@@ -574,9 +575,9 @@ def update_local(self,
time_range: TimeRangeLike.TYPE,
monitor: Monitor = Monitor.NONE) -> bool:
- data_sources = query_data_sources(None, local_id) # type: Sequence['DataSource']
+ data_sources = find_data_sources(None, id=local_id) # type: Sequence['DataSource']
data_source = next((ds for ds in data_sources if isinstance(ds, LocalDataSource) and
- ds.name == local_id), None) # type: LocalDataSource
+ ds.id == local_id), None) # type: LocalDataSource
if not data_source:
raise ValueError("Couldn't find local DataSource", (local_id, data_sources))
@@ -600,11 +601,14 @@ def update_local(self,
if to_remove:
for time_range_to_remove in to_remove:
data_source.reduce_temporal_coverage(time_range_to_remove)
- if to_add:
+ if to_add:
for time_range_to_add in to_add:
self._make_local(data_source, time_range_to_add, None, data_source.variables_info, monitor)
+ # TODO (chris): forman added False (?) to make signature happy
+ return False
+
def delete_local(self, time_range: TimeRangeLike.TYPE) -> int:
if time_range[0] >= self._temporal_coverage[0] \
@@ -652,7 +656,7 @@ def open_dataset(self,
selected_file_list = self._find_files(time_range)
if not selected_file_list:
- msg = 'Data source \'{}\' does not seem to have any data files'.format(self.name)
+ msg = 'Data source \'{}\' does not seem to have any data files'.format(self.id)
if time_range is not None:
msg += ' in given time range {}'.format(TimeRangeLike.format(time_range))
raise IOError(msg)
@@ -694,8 +698,7 @@ def _make_local(self,
var_names: VarNamesLike.TYPE = None,
monitor: Monitor = Monitor.NONE):
- # local_name = local_ds.name
- local_id = local_ds.name
+ local_id = local_ds.id
time_range = TimeRangeLike.convert(time_range) if time_range else None
region = PolygonLike.convert(region) if region else None
@@ -721,7 +724,7 @@ def _make_local(self,
if protocol == _ODP_PROTOCOL_OPENDAP:
files = self._get_urls_list(selected_file_list, protocol)
- monitor.start('Sync ' + self.name, total_work=len(files))
+ monitor.start('Sync ' + self.id, total_work=len(files))
for idx, dataset_uri in enumerate(files):
child_monitor = monitor.child(work=1)
@@ -753,8 +756,8 @@ def _make_local(self,
geo_lon_res = self._get_harmonized_coordinate_value(remote_dataset.attrs,
'geospatial_lat_resolution')
if not (isnan(geo_lat_min) or isnan(geo_lat_max) or
- isnan(geo_lon_min) or isnan(geo_lon_max) or
- isnan(geo_lat_res) or isnan(geo_lon_res)):
+ isnan(geo_lon_min) or isnan(geo_lon_max) or
+ isnan(geo_lat_res) or isnan(geo_lon_res)):
process_region = True
[lon_min, lat_min, lon_max, lat_max] = region.bounds
@@ -845,7 +848,7 @@ def _make_local(self,
outdated_file_list.append(file_rec)
if outdated_file_list:
- with monitor.starting('Sync ' + self.name, len(outdated_file_list)):
+ with monitor.starting('Sync ' + self.id, len(outdated_file_list)):
bytes_to_download = sum([file_rec[3] for file_rec in outdated_file_list])
dl_stat = _DownloadStatistics(bytes_to_download)
@@ -891,7 +894,7 @@ def make_local(self,
del local_meta_info['uuid']
local_meta_info['ref_uuid'] = self.meta_info['uuid']
- local_ds = local_store.create_data_source(local_name, region, _REFERENCE_DATA_SOURCE_TYPE, self.name,
+ local_ds = local_store.create_data_source(local_name, region, _REFERENCE_DATA_SOURCE_TYPE, self.id,
time_range, var_names, meta_info=local_meta_info, lock_file=True)
self._make_local(local_ds, time_range, region, var_names, monitor=monitor)
return local_ds
@@ -938,10 +941,10 @@ def __str__(self):
return self.info_string
def _repr_html_(self):
- return self.name
+ return self.id
def __repr__(self):
- return self.name
+ return self.id
class _DownloadStatistics:
@@ -954,7 +957,7 @@ def handle_chunk(self, chunk):
self.bytes_done += chunk
@staticmethod
- def _to_mibs(bytes_count):
+ def _to_mibs(bytes_count: int) -> float:
return bytes_count / (1024 * 1024)
def __str__(self):
@@ -968,7 +971,6 @@ def __str__(self):
class EsaCciCatalogueService:
-
def __init__(self, catalogue_url: str):
self._catalogue_url = catalogue_url
@@ -1023,20 +1025,20 @@ def _build_catalogue(self, monitor: Monitor = Monitor.NONE):
self._catalogue = {
record.identification.uricode[0]: {
- 'abstract': record.identification.abstract,
- 'bbox_minx': record.identification.bbox.minx if record.identification.bbox else None,
- 'bbox_miny': record.identification.bbox.miny if record.identification.bbox else None,
- 'bbox_maxx': record.identification.bbox.maxx if record.identification.bbox else None,
- 'bbox_maxy': record.identification.bbox.maxy if record.identification.bbox else None,
- 'creation_date':
+ 'abstract': record.identification.abstract,
+ 'bbox_minx': record.identification.bbox.minx if record.identification.bbox else None,
+ 'bbox_miny': record.identification.bbox.miny if record.identification.bbox else None,
+ 'bbox_maxx': record.identification.bbox.maxx if record.identification.bbox else None,
+ 'bbox_maxy': record.identification.bbox.maxy if record.identification.bbox else None,
+ 'creation_date':
next(iter(e.date for e in record.identification.date if e and e.type == 'creation'), None),
- 'publication_date':
+ 'publication_date':
next(iter(e.date for e in record.identification.date if e and e.type == 'publication'), None),
- 'title': record.identification.title,
- 'data_sources': record.identification.uricode[1:],
- 'licences': record.identification.uselimitation,
- 'temporal_coverage_start': record.identification.temporalextent_start,
- 'temporal_coverage_end': record.identification.temporalextent_end
+ 'title': record.identification.title,
+ 'data_sources': record.identification.uricode[1:],
+ 'licences': record.identification.uselimitation,
+ 'temporal_coverage_start': record.identification.temporalextent_start,
+ 'temporal_coverage_end': record.identification.temporalextent_end
}
for record in catalogue_metadata.values()
if record.identification and len(record.identification.uricode) > 0
diff --git a/cate/ds/local.py b/cate/ds/local.py
index f798b29ea..a9c918e6d 100644
--- a/cate/ds/local.py
+++ b/cate/ds/local.py
@@ -18,9 +18,6 @@
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
-__author__ = "Norman Fomferra (Brockmann Consult GmbH), " \
- "Marco Zühlke (Brockmann Consult GmbH), " \
- "Chris Bernat (Telespazio VEGA UK Ltd)"
"""
Description
@@ -55,11 +52,15 @@
from cate.conf import get_config_value
from cate.conf.defaults import NETCDF_COMPRESSION_LEVEL
-from cate.core.ds import DATA_STORE_REGISTRY, DataStore, DataSource, open_xarray_dataset, query_data_sources
+from cate.core.ds import DATA_STORE_REGISTRY, DataStore, DataSource, open_xarray_dataset, find_data_sources
from cate.core.ds import get_data_stores_path
from cate.core.types import PolygonLike, TimeRange, TimeRangeLike, VarNamesLike
from cate.util.monitor import Monitor
+__author__ = "Norman Fomferra (Brockmann Consult GmbH), " \
+ "Marco Zühlke (Brockmann Consult GmbH), " \
+ "Chris Bernat (Telespazio VEGA UK Ltd)"
+
_REFERENCE_DATA_SOURCE_TYPE = "FILE_PATTERN"
@@ -72,13 +73,32 @@ def add_to_data_store_registry():
data_store = LocalDataStore('local', get_data_store_path())
DATA_STORE_REGISTRY.add_data_store(data_store)
-
+# TODO (kbernat): document this class
class LocalDataSource(DataSource):
- def __init__(self, name: str, files: Union[Sequence[str], OrderedDict], data_store: 'LocalDataStore',
- temporal_coverage: TimeRangeLike.TYPE = None, spatial_coverage: PolygonLike.TYPE = None,
- variables: VarNamesLike.TYPE = None, reference_type: str = None, reference_name: str = None,
+ """
+
+ :param id:
+ :param files:
+ :param data_store:
+ :param temporal_coverage:
+ :param spatial_coverage:
+ :param variables:
+ :param reference_type:
+ :param reference_name:
+ :param meta_info:
+ """
+
+ def __init__(self,
+ id: str,
+ files: Union[Sequence[str], OrderedDict],
+ data_store: 'LocalDataStore',
+ temporal_coverage: TimeRangeLike.TYPE = None,
+ spatial_coverage: PolygonLike.TYPE = None,
+ variables: VarNamesLike.TYPE = None,
+ reference_type: str = None,
+ reference_name: str = None,
meta_info: dict = None):
- self._name = name
+ self._id = id
if isinstance(files, Sequence):
self._files = OrderedDict.fromkeys(files)
else:
@@ -127,11 +147,10 @@ def open_dataset(self,
for i in range(len(time_series)):
if time_series[i]:
if isinstance(time_series[i], Tuple) and \
- time_series[i][0] >= time_range[0] and \
- time_series[i][1] <= time_range[1]:
+ time_series[i][0] >= time_range[0] and time_series[i][1] <= time_range[1]:
paths.extend(self._resolve_file_path(file_paths[i]))
elif isinstance(time_series[i], datetime) and \
- time_range[0] <= time_series[i] < time_range[1]:
+ time_range[0] <= time_series[i] < time_range[1]:
paths.extend(self._resolve_file_path(file_paths[i]))
else:
for file in self._files.items():
@@ -165,8 +184,7 @@ def _make_local(self,
var_names: VarNamesLike.TYPE = None,
monitor: Monitor = Monitor.NONE):
- # local_name = local_ds.name
- local_id = local_ds.name
+ local_id = local_ds.id
time_range = TimeRangeLike.convert(time_range) if time_range else None
region = PolygonLike.convert(region) if region else None
@@ -184,7 +202,7 @@ def _make_local(self,
if not os.path.exists(local_path):
os.makedirs(local_path)
- monitor.start("Sync " + self.name, total_work=len(self._files.items()))
+ monitor.start("Sync " + self.id, total_work=len(self._files.items()))
for remote_relative_filepath, coverage in self._files.items():
child_monitor = monitor.child(work=1)
@@ -227,7 +245,7 @@ def _make_local(self,
geo_lon_res = self._get_harmonized_coordinate_value(remote_dataset.attrs,
'geospatial_lat_resolution')
if not (isnan(geo_lat_min) or isnan(geo_lat_max) or isnan(geo_lon_min) or
- isnan(geo_lon_max) or isnan(geo_lat_res) or isnan(geo_lon_res)):
+ isnan(geo_lon_max) or isnan(geo_lat_res) or isnan(geo_lon_res)):
process_region = True
[lon_min, lat_min, lon_max, lat_max] = region.bounds
@@ -330,7 +348,7 @@ def make_local(self,
if not local_store:
raise ValueError('Cannot initialize `local` DataStore')
- local_ds = local_store.create_data_source(local_name, region, _REFERENCE_DATA_SOURCE_TYPE, self.name,
+ local_ds = local_store.create_data_source(local_name, region, _REFERENCE_DATA_SOURCE_TYPE, self.id,
meta_info=self.meta_info)
self._make_local(local_ds, time_range, region, var_names, monitor)
return local_ds
@@ -340,9 +358,9 @@ def update_local(self,
time_range: TimeRangeLike.TYPE,
monitor: Monitor = Monitor.NONE) -> bool:
- data_sources = query_data_sources(None, local_id) # type: Sequence['DataSource']
+ data_sources = find_data_sources(None, id=local_id) # type: Sequence['DataSource']
data_source = next((ds for ds in data_sources if isinstance(ds, LocalDataSource) and
- ds.name == local_id), None) # type: LocalDataSource
+ ds.id == local_id), None) # type: LocalDataSource
if not data_source:
raise ValueError("Couldn't find local DataSource", (local_id, data_sources))
@@ -446,6 +464,18 @@ def temporal_coverage(self, monitor: Monitor = Monitor.NONE) -> Optional[TimeRan
def spatial_coverage(self):
return self._spatial_coverage
+ @property
+ def data_store(self) -> DataStore:
+ return self._data_store
+
+ @property
+ def id(self) -> str:
+ return self._id
+
+ @property
+ def meta_info(self) -> OrderedDict:
+ return self._meta_info
+
@property
def variables_info(self):
return self._variables
@@ -459,19 +489,7 @@ def _repr_html_(self):
return '
\n' \
'
Name
%s
\n' \
'
Files
%s
\n' \
- '
\n' % (html.escape(self._name), html.escape(' '.join(self._files)))
-
- @property
- def data_store(self) -> DataStore:
- return self._data_store
-
- @property
- def name(self) -> str:
- return self._name
-
- @property
- def meta_info(self) -> OrderedDict:
- return self._meta_info
+ '\n' % (html.escape(self._id), html.escape(' '.join(self._files)))
def to_json_dict(self):
"""
@@ -480,7 +498,7 @@ def to_json_dict(self):
:return: A JSON-serializable dictionary
"""
config = OrderedDict({
- 'name': self._name,
+ 'name': self._id,
'meta_data': {
'deprecated': 'to be merged with meta_info in the future',
'temporal_covrage': TimeRangeLike.format(self._temporal_coverage) if self._temporal_coverage else None,
@@ -521,8 +539,8 @@ def from_json_dict(cls, json_dicts: dict, data_store: 'LocalDataStore') -> Optio
file_details_length = len(files[0])
if file_details_length > 2:
files_dict = OrderedDict((item[0], (parser.parse(item[1]).replace(microsecond=0),
- parser.parse(item[2]).replace(microsecond=0))
- if item[1] and item[2] else None) for item in files)
+ parser.parse(item[2]).replace(microsecond=0))
+ if item[1] and item[2] else None) for item in files)
elif file_details_length > 0:
files_dict = OrderedDict((item[0], parser.parse(item[1]).replace(microsecond=0))
if len(item) > 1 else (item[0], None) for item in files)
@@ -533,13 +551,13 @@ def from_json_dict(cls, json_dicts: dict, data_store: 'LocalDataStore') -> Optio
class LocalDataStore(DataStore):
- def __init__(self, name: str, store_dir: str):
- super().__init__(name)
+ def __init__(self, id: str, store_dir: str):
+ super().__init__(id, title='Local Data Sources')
self._store_dir = store_dir
self._data_sources = None
- def add_pattern(self, name: str, files: Union[str, Sequence[str]] = None) -> 'DataSource':
- data_source = self.create_data_source(name)
+ def add_pattern(self, data_source_id: str, files: Union[str, Sequence[str]] = None) -> 'DataSource':
+ data_source = self.create_data_source(data_source_id)
if isinstance(files, str):
files = [files]
is_first_file = True
@@ -551,46 +569,46 @@ def add_pattern(self, name: str, files: Union[str, Sequence[str]] = None) -> 'Da
data_source.add_dataset(file)
return data_source
- def remove_data_source(self, name: str, remove_files: bool = True):
- data_sources = self.query(name)
+ def remove_data_source(self, data_source_id: str, remove_files: bool = True):
+ data_sources = self.query(id=data_source_id)
if not data_sources or len(data_sources) != 1:
return
data_source = data_sources[0]
- file_name = os.path.join(self._store_dir, data_source.name + '.json')
+ file_name = os.path.join(self._store_dir, data_source.id + '.json')
os.remove(file_name)
if remove_files:
- shutil.rmtree(os.path.join(self._store_dir, data_source.name))
+ shutil.rmtree(os.path.join(self._store_dir, data_source.id))
self._data_sources.remove(data_source)
- def create_data_source(self, name: str, region: PolygonLike.TYPE = None,
+ def create_data_source(self, data_source_id: str, region: PolygonLike.TYPE = None,
reference_type: str = None, reference_name: str = None,
time_range: TimeRangeLike.TYPE = None, var_names: VarNamesLike.TYPE = None,
meta_info: OrderedDict = None, lock_file: bool = False):
self._init_data_sources()
- if not name.startswith('%s.' % self.name):
- name = '%s.%s' % (self.name, name)
- lock_filename = '{}.lock'.format(name)
+ if not data_source_id.startswith('%s.' % self.id):
+ data_source_id = '%s.%s' % (self.id, data_source_id)
+ lock_filename = '{}.lock'.format(data_source_id)
lock_filepath = os.path.join(self._store_dir, lock_filename)
existing_ds = None
for ds in self._data_sources:
- if ds.name == name:
+ if ds.id == data_source_id:
if lock_file and os.path.isfile(lock_filepath):
with open(lock_filepath, 'r') as lock_file:
writer_pid = lock_file.readline()
if psutil.pid_exists(int(writer_pid)):
raise ValueError("Cannot access data source {}, another process is using it (pid:{}"
- .format(ds.name, writer_pid))
+ .format(ds.id, writer_pid))
# ds.temporal_coverage() == time_range and
if ds.spatial_coverage() == region \
and ds.variables_info == var_names:
existing_ds = ds
break
raise ValueError("Local data store '{}' already contains a data source named '{}'"
- .format(self.name, name))
+ .format(self.id, data_source_id))
if existing_ds:
data_source = existing_ds
else:
- data_source = LocalDataSource(name, files=[], data_store=self, spatial_coverage=region,
+ data_source = LocalDataSource(data_source_id, files=[], data_store=self, spatial_coverage=region,
reference_type=reference_type, reference_name=reference_name,
meta_info=meta_info)
if lock_file:
@@ -606,16 +624,15 @@ def create_data_source(self, name: str, region: PolygonLike.TYPE = None,
def data_store_path(self):
return self._store_dir
- def query(self, name=None, monitor: Monitor = Monitor.NONE) -> Sequence[LocalDataSource]:
+ def query(self, id: str = None, query_expr: str = None, monitor: Monitor = Monitor.NONE) -> Sequence[
+ LocalDataSource]:
self._init_data_sources()
- if name:
- result = [ds for ds in self._data_sources if ds.matches_filter(name)]
- else:
- result = self._data_sources
- return result
+ if id or query_expr:
+ return [ds for ds in self._data_sources if ds.matches(id=id, query_expr=query_expr)]
+ return self._data_sources
def __repr__(self):
- return "LocalFilePatternDataStore(%s)" % repr(self.name)
+ return "LocalFilePatternDataStore(%s)" % repr(self.id)
def _repr_html_(self):
self._init_data_sources()
@@ -625,7 +642,7 @@ def _repr_html_(self):
row_count += 1
# noinspection PyProtectedMember
rows.append('