Skip to content
This repository has been archived by the owner on Aug 29, 2023. It is now read-only.

Commit

Permalink
Merge pull request #347 from CCI-Tools/277-kb-unique-id-local-ds
Browse files Browse the repository at this point in the history
Unique id for local copies of remote datasources.
Closes #277
  • Loading branch information
kbernat authored Sep 7, 2017
2 parents 99290ec + 5e6cae3 commit eab7719
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 71 deletions.
12 changes: 6 additions & 6 deletions cate/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@
from typing import Tuple, Union, List, Dict, Any, Optional

from cate.conf.defaults import WEBAPI_INFO_FILE, WEBAPI_ON_INACTIVITY_AUTO_STOP_AFTER
from cate.core.types import Like, TimeRangeLike
from cate.core.types import Like, TimeRangeLike, PolygonLike, VarNamesLike
from cate.core.ds import DATA_STORE_REGISTRY, find_data_sources
from cate.core.objectio import OBJECT_IO_REGISTRY, find_writer, read_object
from cate.core.op import OP_REGISTRY
Expand Down Expand Up @@ -1230,13 +1230,13 @@ def _execute_copy(cls, command_args):
if data_source is None:
raise RuntimeError('internal error: no local data source found: %s' % ds_name)

local_name = command_args.name if command_args.name else ds_name
local_name = command_args.name if command_args.name else None

time_range = command_args.time
region = command_args.region
var_names = command_args.vars
time_range = TimeRangeLike.convert(command_args.time)
region = PolygonLike.convert(command_args.region)
var_names = VarNamesLike.convert(command_args.vars)

ds = data_source.make_local(local_name, None, time_range=time_range, region=region, var_names=var_names,
ds = data_source.make_local(local_name, time_range=time_range, region=region, var_names=var_names,
monitor=cls.new_monitor())
if ds:
print("Local data source with name '%s' has been created." % ds.id)
Expand Down
2 changes: 1 addition & 1 deletion cate/core/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ def convert(cls, value: Any) -> Optional[VarNames]:
raise ValueError('Variable name pattern can only be a string'
' or a list of strings.')

return value
return value.copy()

@classmethod
def format(cls, value: Optional[VarNames]) -> str:
Expand Down
27 changes: 16 additions & 11 deletions cate/ds/esa_cci_odp.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
from cate.core.ds import DATA_STORE_REGISTRY, DataStore, DataSource, Schema, \
open_xarray_dataset, get_data_stores_path
from cate.core.types import PolygonLike, TimeRange, TimeRangeLike, VarNamesLike, VarNames
from cate.ds.local import add_to_data_store_registry, LocalDataSource
from cate.ds.local import add_to_data_store_registry, LocalDataSource, LocalDataStore
from cate.util.monitor import Monitor

__author__ = "Norman Fomferra (Brockmann Consult GmbH), " \
Expand Down Expand Up @@ -956,10 +956,6 @@ def make_local(self,
region: PolygonLike.TYPE = None,
var_names: VarNamesLike.TYPE = None,
monitor: Monitor = Monitor.NONE) -> Optional[DataSource]:
if not local_name:
raise ValueError('local_name is required')
elif len(local_name) == 0:
raise ValueError('local_name cannot be empty')

local_store = DATA_STORE_REGISTRY.get_data_store('local')
if not local_store:
Expand All @@ -969,22 +965,31 @@ def make_local(self,
raise ValueError('Cannot initialize `local` DataStore')

local_meta_info = self.meta_info.copy()
if local_meta_info.get('uuid'):
del local_meta_info['uuid']
local_meta_info['ref_uuid'] = self.meta_info['uuid']

local_ds = local_store.create_data_source(local_name, region, _REFERENCE_DATA_SOURCE_TYPE, self.id,
time_range, var_names, meta_info=local_meta_info, lock_file=True)
if not local_name or len(local_name) == 0:
local_name = "local.{}.{}".format(self.id, LocalDataStore.generate_uuid(ref_id=self.id,
time_range=time_range,
region=region,
var_names=var_names))
existing_ds_list = local_store.query(local_name)
if len(existing_ds_list) == 1:
return existing_ds_list[0]

local_ds = local_store.create_data_source(local_name,
time_range=time_range, region=region, var_names=var_names,
meta_info=local_meta_info, lock_file=True)
if local_ds:
if not local_ds.is_complete:
self._make_local(local_ds, time_range, region, var_names, monitor=monitor)

if local_ds.is_empty:
local_store.remove_data_source(local_ds)
return None

local_store.register_ds(local_ds)
return local_ds
return None
else:
return None

def _init_file_list(self, monitor: Monitor = Monitor.NONE):
if self._file_list:
Expand Down
71 changes: 62 additions & 9 deletions cate/ds/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import os
import psutil
import shutil
import uuid
import xarray as xr
from collections import OrderedDict
from datetime import datetime
Expand All @@ -54,7 +55,7 @@
from cate.conf.defaults import NETCDF_COMPRESSION_LEVEL
from cate.core.ds import DATA_STORE_REGISTRY, DataStore, DataSource, open_xarray_dataset
from cate.core.ds import get_data_stores_path
from cate.core.types import PolygonLike, TimeRange, TimeRangeLike, VarNamesLike
from cate.core.types import Polygon, PolygonLike, TimeRange, TimeRangeLike, VarNames, VarNamesLike
from cate.util.monitor import Monitor

__author__ = "Norman Fomferra (Brockmann Consult GmbH), " \
Expand All @@ -63,6 +64,8 @@

_REFERENCE_DATA_SOURCE_TYPE = "FILE_PATTERN"

_NAMESPACE = uuid.UUID(bytes=b"1234567890123456", version=3)


def get_data_store_path():
return os.environ.get('CATE_LOCAL_DATA_STORE_PATH',
Expand Down Expand Up @@ -338,10 +341,8 @@ def make_local(self,
region: PolygonLike.TYPE = None,
var_names: VarNamesLike.TYPE = None,
monitor: Monitor = Monitor.NONE) -> Optional[DataSource]:
if not local_name:
raise ValueError('local_name is required')
elif len(local_name) == 0:
raise ValueError('local_name cannot be empty')
if not local_name or len(local_name) == 0:
local_name = self.title

local_store = DATA_STORE_REGISTRY.get_data_store('local')
if not local_store:
Expand All @@ -350,8 +351,18 @@ def make_local(self,
if not local_store:
raise ValueError('Cannot initialize `local` DataStore')

local_ds = local_store.create_data_source(local_name, region, _REFERENCE_DATA_SOURCE_TYPE, self.id,
meta_info=self.meta_info)
if not local_name or len(local_name) == 0:
local_name = "local.{}.{}".format(self.id, LocalDataStore.generate_uuid(ref_id=self.id,
time_range=time_range,
region=region,
var_names=var_names))
existing_ds_list = local_store.query(local_name)
if len(existing_ds_list) == 1:
return existing_ds_list[0]

local_ds = local_store.create_data_source(local_name, region, _REFERENCE_DATA_SOURCE_TYPE, local_name,
time_range=time_range, var_names=var_names,
meta_info=self.meta_info.copy())
if local_ds:
if not local_ds.is_complete:
self._make_local(local_ds, time_range, region, var_names, monitor=monitor)
Expand Down Expand Up @@ -642,15 +653,56 @@ def register_ds(self, data_source: DataSource):
data_source.set_completed(True)
self._data_sources.append(data_source)

@classmethod
def generate_uuid(cls, ref_id: str,
time_range: Optional[TimeRange] = None,
region: Optional[Polygon] = None,
var_names: Optional[VarNames] = None) -> uuid.UUID:

if time_range:
ref_id += TimeRangeLike.format(time_range)
if region:
ref_id += PolygonLike.format(region)
if var_names:
ref_id += VarNamesLike.format(var_names)

return str(uuid.uuid3(_NAMESPACE, ref_id))

@classmethod
def generate_title(cls, title: str,
time_range: Optional[TimeRange] = None,
region: Optional[Polygon] = None,
var_names: Optional[VarNames] = None) -> uuid.UUID:

if time_range:
title += " [TimeRange:{}]".format(TimeRangeLike.format(time_range))
if region:
title += " [Region:{}]".format(PolygonLike.format(region))
if var_names:
title += " [Variables:{}]".format(VarNamesLike.format(var_names))

return title

def create_data_source(self, data_source_id: str, region: PolygonLike.TYPE = None,
reference_type: str = None, reference_name: str = None,
reference_type: str = None, title: str = None,
time_range: TimeRangeLike.TYPE = None, var_names: VarNamesLike.TYPE = None,
meta_info: OrderedDict = None, lock_file: bool = False):
self._init_data_sources()

if meta_info:
meta_info['title'] = title

if meta_info.get('uuid'):
meta_info['ref_uuid'] = meta_info['uuid']
del meta_info['uuid']

lock_filepath = os.path.join(self._store_dir, '{}.lock'.format(data_source_id))

if not data_source_id.startswith('%s.' % self.id):
data_source_id = '%s.%s' % (self.id, data_source_id)
lock_filename = '{}.lock'.format(data_source_id)
lock_filepath = os.path.join(self._store_dir, lock_filename)

data_source = None
for ds in self._data_sources:
if ds.id == data_source_id:
Expand All @@ -670,7 +722,8 @@ def create_data_source(self, data_source_id: str, region: PolygonLike.TYPE = Non
.format(self.id, data_source_id))
if not data_source:
data_source = LocalDataSource(data_source_id, files=[], data_store=self, spatial_coverage=region,
reference_type=reference_type, reference_name=reference_name,
variables=var_names, temporal_coverage=time_range,
reference_type=reference_type, reference_name=title,
meta_info=meta_info)
data_source.set_completed(False)
self._save_data_source(data_source)
Expand Down
68 changes: 46 additions & 22 deletions test/ds/test_esa_cci_odp.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import shutil

from cate.core.ds import DATA_STORE_REGISTRY
from cate.core.types import PolygonLike, TimeRangeLike
from cate.core.types import PolygonLike, TimeRangeLike, VarNamesLike
from cate.ds.esa_cci_odp import EsaCciOdpDataStore, find_datetime_format
from cate.ds.local import LocalDataStore

Expand Down Expand Up @@ -124,18 +124,18 @@ def build_file_item(item_name: str, date_from: datetime, date_to: datetime, size

with unittest.mock.patch('cate.ds.esa_cci_odp.EsaCciOdpDataSource._find_files', find_files_mock):
with unittest.mock.patch.object(EsaCciOdpDataStore, 'query', return_value=[]):

new_ds_title = 'local_ds_test'
new_ds_time_range = TimeRangeLike.convert((datetime.datetime(1978, 11, 14, 0, 0),
datetime.datetime(1978, 11, 16, 23, 59)))
try:
new_ds = self.first_oc_data_source.make_local('local_ds_test', None,
(datetime.datetime(1978, 11, 14, 0, 0),
datetime.datetime(1978, 11, 15, 23, 59)))
new_ds = self.first_oc_data_source.make_local(new_ds_title, time_range=new_ds_time_range)
except:
raise ValueError(reference_path, os.listdir(reference_path))
self.assertIsNotNone(new_ds)

self.assertEqual(new_ds.id, 'local.local_ds_test')
self.assertEqual(new_ds.temporal_coverage(),
(datetime.datetime(1978, 11, 14, 0, 0),
datetime.datetime(1978, 11, 15, 23, 59)))
self.assertEqual(new_ds.id, "local.%s" % new_ds_title)
self.assertEqual(new_ds.temporal_coverage(), new_ds_time_range)

self.first_oc_data_source.update_local(new_ds.id, (datetime.datetime(1978, 11, 15, 00, 00),
datetime.datetime(1978, 11, 16, 23, 59)))
Expand All @@ -154,28 +154,52 @@ def build_file_item(item_name: str, date_from: datetime, date_to: datetime, size
datetime.datetime(1978, 11, 16, 23, 59)))
self.assertTrue("Couldn't find local DataSource", context.exception.args[0])

new_ds_w_one_variable_title = 'local_ds_test_var'
new_ds_w_one_variable_time_range = TimeRangeLike.convert((datetime.datetime(1978, 11, 14, 0, 0),
datetime.datetime(1978, 11, 16, 23, 59)))
new_ds_w_one_variable_var_names = VarNamesLike.convert(['sm'])

new_ds_w_one_variable = self.first_oc_data_source.make_local(
'local_ds_test_2', None, (datetime.datetime(1978, 11, 14, 0, 0),
datetime.datetime(1978, 11, 15, 23, 59)), None, ['sm'])
new_ds_w_one_variable_title,
time_range=new_ds_w_one_variable_time_range,
var_names=new_ds_w_one_variable_var_names
)
self.assertIsNotNone(new_ds_w_one_variable)
self.assertEqual(new_ds_w_one_variable.id, 'local.local_ds_test_2')

self.assertEqual(new_ds_w_one_variable.id, "local.%s" % new_ds_w_one_variable_title)
ds = new_ds_w_one_variable.open_dataset()
self.assertSetEqual(set(ds.variables), {'sm', 'lat', 'lon', 'time'})

new_ds_w_one_variable_var_names.extend(['lat', 'lon', 'time'])

self.assertSetEqual(set(ds.variables),
set(new_ds_w_one_variable_var_names))

new_ds_w_region_title = 'from_local_to_local_region'
new_ds_w_region_time_range = TimeRangeLike.convert((datetime.datetime(1978, 11, 14, 0, 0),
datetime.datetime(1978, 11, 16, 23, 59)))
new_ds_w_region_var_names = VarNamesLike.convert(['sm'])
new_ds_w_region_spatial_coverage = PolygonLike.convert("10,10,20,20")

new_ds_w_region = self.first_oc_data_source.make_local(
'from_local_to_local_region', None, (datetime.datetime(1978, 11, 14, 0, 0),
datetime.datetime(1978, 11, 15, 23, 59)),
"10,10,20,20", ['sm']) # type: LocalDataSource
new_ds_w_region_title,
time_range=new_ds_w_region_time_range,
var_names=new_ds_w_region_var_names,
region=new_ds_w_region_spatial_coverage) # type: LocalDataSource

self.assertIsNotNone(new_ds_w_region)
self.assertEqual(new_ds_w_region.id, 'local.from_local_to_local_region')
self.assertEqual(new_ds_w_region.spatial_coverage(), PolygonLike.convert("10,10,20,20"))

self.assertEqual(new_ds_w_region.id, "local.%s" % new_ds_w_region_title)

self.assertEqual(new_ds_w_region.spatial_coverage(), new_ds_w_region_spatial_coverage)
data_set = new_ds_w_region.open_dataset()
self.assertSetEqual(set(data_set.variables), {'sm', 'lat', 'lon', 'time'})

no_data = self.first_oc_data_source.make_local(
'empty_ds', None, (datetime.datetime(2017, 12, 1, 0, 0),
datetime.datetime(2017, 12, 31, 23, 59)),
)
new_ds_w_region_var_names.extend(['lat', 'lon', 'time'])

self.assertSetEqual(set(data_set.variables), set(new_ds_w_region_var_names))

no_data = self.first_oc_data_source.make_local('empty_ds',
time_range=(datetime.datetime(2017, 12, 1, 0, 0),
datetime.datetime(2017, 12, 31, 23, 59)))
self.assertIsNone(no_data)

def test_data_store(self):
Expand Down
Loading

0 comments on commit eab7719

Please sign in to comment.