Skip to content
This repository has been archived by the owner on Aug 29, 2023. It is now read-only.

277 kb unique id local ds #347

Merged
merged 7 commits into from
Sep 7, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions cate/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@
from typing import Tuple, Union, List, Dict, Any, Optional

from cate.conf.defaults import WEBAPI_INFO_FILE, WEBAPI_ON_INACTIVITY_AUTO_STOP_AFTER
from cate.core.types import Like, TimeRangeLike
from cate.core.types import Like, TimeRangeLike, PolygonLike, VarNamesLike
from cate.core.ds import DATA_STORE_REGISTRY, find_data_sources
from cate.core.objectio import OBJECT_IO_REGISTRY, find_writer, read_object
from cate.core.op import OP_REGISTRY
Expand Down Expand Up @@ -1230,13 +1230,13 @@ def _execute_copy(cls, command_args):
if data_source is None:
raise RuntimeError('internal error: no local data source found: %s' % ds_name)

local_name = command_args.name if command_args.name else ds_name
local_name = command_args.name if command_args.name else None

time_range = command_args.time
region = command_args.region
var_names = command_args.vars
time_range = TimeRangeLike.convert(command_args.time)
region = PolygonLike.convert(command_args.region)
var_names = VarNamesLike.convert(command_args.vars)

ds = data_source.make_local(local_name, None, time_range=time_range, region=region, var_names=var_names,
ds = data_source.make_local(local_name, time_range=time_range, region=region, var_names=var_names,
monitor=cls.new_monitor())
if ds:
print("Local data source with name '%s' has been created." % ds.id)
Expand Down
2 changes: 1 addition & 1 deletion cate/core/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ def convert(cls, value: Any) -> Optional[VarNames]:
raise ValueError('Variable name pattern can only be a string'
' or a list of strings.')

return value
return value.copy()

@classmethod
def format(cls, value: Optional[VarNames]) -> str:
Expand Down
27 changes: 16 additions & 11 deletions cate/ds/esa_cci_odp.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
from cate.core.ds import DATA_STORE_REGISTRY, DataStore, DataSource, Schema, \
open_xarray_dataset, get_data_stores_path
from cate.core.types import PolygonLike, TimeRange, TimeRangeLike, VarNamesLike, VarNames
from cate.ds.local import add_to_data_store_registry, LocalDataSource
from cate.ds.local import add_to_data_store_registry, LocalDataSource, LocalDataStore
from cate.util.monitor import Monitor

__author__ = "Norman Fomferra (Brockmann Consult GmbH), " \
Expand Down Expand Up @@ -956,10 +956,6 @@ def make_local(self,
region: PolygonLike.TYPE = None,
var_names: VarNamesLike.TYPE = None,
monitor: Monitor = Monitor.NONE) -> Optional[DataSource]:
if not local_name:
raise ValueError('local_name is required')
elif len(local_name) == 0:
raise ValueError('local_name cannot be empty')

local_store = DATA_STORE_REGISTRY.get_data_store('local')
if not local_store:
Expand All @@ -969,22 +965,31 @@ def make_local(self,
raise ValueError('Cannot initialize `local` DataStore')

local_meta_info = self.meta_info.copy()
if local_meta_info.get('uuid'):
del local_meta_info['uuid']
local_meta_info['ref_uuid'] = self.meta_info['uuid']

local_ds = local_store.create_data_source(local_name, region, _REFERENCE_DATA_SOURCE_TYPE, self.id,
time_range, var_names, meta_info=local_meta_info, lock_file=True)
if not local_name or len(local_name) == 0:
local_name = "local.{}.{}".format(self.id, LocalDataStore.generate_uuid(ref_id=self.id,
time_range=time_range,
region=region,
var_names=var_names))
existing_ds_list = local_store.query(local_name)
if len(existing_ds_list) == 1:
return existing_ds_list[0]

local_ds = local_store.create_data_source(local_name,
time_range=time_range, region=region, var_names=var_names,
meta_info=local_meta_info, lock_file=True)
if local_ds:
if not local_ds.is_complete:
self._make_local(local_ds, time_range, region, var_names, monitor=monitor)

if local_ds.is_empty:
local_store.remove_data_source(local_ds)
return None

local_store.register_ds(local_ds)
return local_ds
return None
else:
return None

def _init_file_list(self, monitor: Monitor = Monitor.NONE):
if self._file_list:
Expand Down
71 changes: 62 additions & 9 deletions cate/ds/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import os
import psutil
import shutil
import uuid
import xarray as xr
from collections import OrderedDict
from datetime import datetime
Expand All @@ -54,7 +55,7 @@
from cate.conf.defaults import NETCDF_COMPRESSION_LEVEL
from cate.core.ds import DATA_STORE_REGISTRY, DataStore, DataSource, open_xarray_dataset
from cate.core.ds import get_data_stores_path
from cate.core.types import PolygonLike, TimeRange, TimeRangeLike, VarNamesLike
from cate.core.types import Polygon, PolygonLike, TimeRange, TimeRangeLike, VarNames, VarNamesLike
from cate.util.monitor import Monitor

__author__ = "Norman Fomferra (Brockmann Consult GmbH), " \
Expand All @@ -63,6 +64,8 @@

_REFERENCE_DATA_SOURCE_TYPE = "FILE_PATTERN"

_NAMESPACE = uuid.UUID(bytes=b"1234567890123456", version=3)


def get_data_store_path():
return os.environ.get('CATE_LOCAL_DATA_STORE_PATH',
Expand Down Expand Up @@ -338,10 +341,8 @@ def make_local(self,
region: PolygonLike.TYPE = None,
var_names: VarNamesLike.TYPE = None,
monitor: Monitor = Monitor.NONE) -> Optional[DataSource]:
if not local_name:
raise ValueError('local_name is required')
elif len(local_name) == 0:
raise ValueError('local_name cannot be empty')
if not local_name or len(local_name) == 0:
local_name = self.title

local_store = DATA_STORE_REGISTRY.get_data_store('local')
if not local_store:
Expand All @@ -350,8 +351,18 @@ def make_local(self,
if not local_store:
raise ValueError('Cannot initialize `local` DataStore')

local_ds = local_store.create_data_source(local_name, region, _REFERENCE_DATA_SOURCE_TYPE, self.id,
meta_info=self.meta_info)
if not local_name or len(local_name) == 0:
local_name = "local.{}.{}".format(self.id, LocalDataStore.generate_uuid(ref_id=self.id,
time_range=time_range,
region=region,
var_names=var_names))
existing_ds_list = local_store.query(local_name)
if len(existing_ds_list) == 1:
return existing_ds_list[0]

local_ds = local_store.create_data_source(local_name, region, _REFERENCE_DATA_SOURCE_TYPE, local_name,
time_range=time_range, var_names=var_names,
meta_info=self.meta_info.copy())
if local_ds:
if not local_ds.is_complete:
self._make_local(local_ds, time_range, region, var_names, monitor=monitor)
Expand Down Expand Up @@ -642,15 +653,56 @@ def register_ds(self, data_source: DataSource):
data_source.set_completed(True)
self._data_sources.append(data_source)

@classmethod
def generate_uuid(cls, ref_id: str,
time_range: Optional[TimeRange] = None,
region: Optional[Polygon] = None,
var_names: Optional[VarNames] = None) -> uuid.UUID:

if time_range:
ref_id += TimeRangeLike.format(time_range)
if region:
ref_id += PolygonLike.format(region)
if var_names:
ref_id += VarNamesLike.format(var_names)

return str(uuid.uuid3(_NAMESPACE, ref_id))

@classmethod
def generate_title(cls, title: str,
time_range: Optional[TimeRange] = None,
region: Optional[Polygon] = None,
var_names: Optional[VarNames] = None) -> uuid.UUID:

if time_range:
title += " [TimeRange:{}]".format(TimeRangeLike.format(time_range))
if region:
title += " [Region:{}]".format(PolygonLike.format(region))
if var_names:
title += " [Variables:{}]".format(VarNamesLike.format(var_names))

return title

def create_data_source(self, data_source_id: str, region: PolygonLike.TYPE = None,
reference_type: str = None, reference_name: str = None,
reference_type: str = None, title: str = None,
time_range: TimeRangeLike.TYPE = None, var_names: VarNamesLike.TYPE = None,
meta_info: OrderedDict = None, lock_file: bool = False):
self._init_data_sources()

if meta_info:
meta_info['title'] = title

if meta_info.get('uuid'):
meta_info['ref_uuid'] = meta_info['uuid']
del meta_info['uuid']

lock_filepath = os.path.join(self._store_dir, '{}.lock'.format(data_source_id))

if not data_source_id.startswith('%s.' % self.id):
data_source_id = '%s.%s' % (self.id, data_source_id)
lock_filename = '{}.lock'.format(data_source_id)
lock_filepath = os.path.join(self._store_dir, lock_filename)

data_source = None
for ds in self._data_sources:
if ds.id == data_source_id:
Expand All @@ -670,7 +722,8 @@ def create_data_source(self, data_source_id: str, region: PolygonLike.TYPE = Non
.format(self.id, data_source_id))
if not data_source:
data_source = LocalDataSource(data_source_id, files=[], data_store=self, spatial_coverage=region,
reference_type=reference_type, reference_name=reference_name,
variables=var_names, temporal_coverage=time_range,
reference_type=reference_type, reference_name=title,
meta_info=meta_info)
data_source.set_completed(False)
self._save_data_source(data_source)
Expand Down
68 changes: 46 additions & 22 deletions test/ds/test_esa_cci_odp.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import shutil

from cate.core.ds import DATA_STORE_REGISTRY
from cate.core.types import PolygonLike, TimeRangeLike
from cate.core.types import PolygonLike, TimeRangeLike, VarNamesLike
from cate.ds.esa_cci_odp import EsaCciOdpDataStore, find_datetime_format
from cate.ds.local import LocalDataStore

Expand Down Expand Up @@ -124,18 +124,18 @@ def build_file_item(item_name: str, date_from: datetime, date_to: datetime, size

with unittest.mock.patch('cate.ds.esa_cci_odp.EsaCciOdpDataSource._find_files', find_files_mock):
with unittest.mock.patch.object(EsaCciOdpDataStore, 'query', return_value=[]):

new_ds_title = 'local_ds_test'
new_ds_time_range = TimeRangeLike.convert((datetime.datetime(1978, 11, 14, 0, 0),
datetime.datetime(1978, 11, 16, 23, 59)))
try:
new_ds = self.first_oc_data_source.make_local('local_ds_test', None,
(datetime.datetime(1978, 11, 14, 0, 0),
datetime.datetime(1978, 11, 15, 23, 59)))
new_ds = self.first_oc_data_source.make_local(new_ds_title, time_range=new_ds_time_range)
except:
raise ValueError(reference_path, os.listdir(reference_path))
self.assertIsNotNone(new_ds)

self.assertEqual(new_ds.id, 'local.local_ds_test')
self.assertEqual(new_ds.temporal_coverage(),
(datetime.datetime(1978, 11, 14, 0, 0),
datetime.datetime(1978, 11, 15, 23, 59)))
self.assertEqual(new_ds.id, "local.%s" % new_ds_title)
self.assertEqual(new_ds.temporal_coverage(), new_ds_time_range)

self.first_oc_data_source.update_local(new_ds.id, (datetime.datetime(1978, 11, 15, 00, 00),
datetime.datetime(1978, 11, 16, 23, 59)))
Expand All @@ -154,28 +154,52 @@ def build_file_item(item_name: str, date_from: datetime, date_to: datetime, size
datetime.datetime(1978, 11, 16, 23, 59)))
self.assertTrue("Couldn't find local DataSource", context.exception.args[0])

new_ds_w_one_variable_title = 'local_ds_test_var'
new_ds_w_one_variable_time_range = TimeRangeLike.convert((datetime.datetime(1978, 11, 14, 0, 0),
datetime.datetime(1978, 11, 16, 23, 59)))
new_ds_w_one_variable_var_names = VarNamesLike.convert(['sm'])

new_ds_w_one_variable = self.first_oc_data_source.make_local(
'local_ds_test_2', None, (datetime.datetime(1978, 11, 14, 0, 0),
datetime.datetime(1978, 11, 15, 23, 59)), None, ['sm'])
new_ds_w_one_variable_title,
time_range=new_ds_w_one_variable_time_range,
var_names=new_ds_w_one_variable_var_names
)
self.assertIsNotNone(new_ds_w_one_variable)
self.assertEqual(new_ds_w_one_variable.id, 'local.local_ds_test_2')

self.assertEqual(new_ds_w_one_variable.id, "local.%s" % new_ds_w_one_variable_title)
ds = new_ds_w_one_variable.open_dataset()
self.assertSetEqual(set(ds.variables), {'sm', 'lat', 'lon', 'time'})

new_ds_w_one_variable_var_names.extend(['lat', 'lon', 'time'])

self.assertSetEqual(set(ds.variables),
set(new_ds_w_one_variable_var_names))

new_ds_w_region_title = 'from_local_to_local_region'
new_ds_w_region_time_range = TimeRangeLike.convert((datetime.datetime(1978, 11, 14, 0, 0),
datetime.datetime(1978, 11, 16, 23, 59)))
new_ds_w_region_var_names = VarNamesLike.convert(['sm'])
new_ds_w_region_spatial_coverage = PolygonLike.convert("10,10,20,20")

new_ds_w_region = self.first_oc_data_source.make_local(
'from_local_to_local_region', None, (datetime.datetime(1978, 11, 14, 0, 0),
datetime.datetime(1978, 11, 15, 23, 59)),
"10,10,20,20", ['sm']) # type: LocalDataSource
new_ds_w_region_title,
time_range=new_ds_w_region_time_range,
var_names=new_ds_w_region_var_names,
region=new_ds_w_region_spatial_coverage) # type: LocalDataSource

self.assertIsNotNone(new_ds_w_region)
self.assertEqual(new_ds_w_region.id, 'local.from_local_to_local_region')
self.assertEqual(new_ds_w_region.spatial_coverage(), PolygonLike.convert("10,10,20,20"))

self.assertEqual(new_ds_w_region.id, "local.%s" % new_ds_w_region_title)

self.assertEqual(new_ds_w_region.spatial_coverage(), new_ds_w_region_spatial_coverage)
data_set = new_ds_w_region.open_dataset()
self.assertSetEqual(set(data_set.variables), {'sm', 'lat', 'lon', 'time'})

no_data = self.first_oc_data_source.make_local(
'empty_ds', None, (datetime.datetime(2017, 12, 1, 0, 0),
datetime.datetime(2017, 12, 31, 23, 59)),
)
new_ds_w_region_var_names.extend(['lat', 'lon', 'time'])

self.assertSetEqual(set(data_set.variables), set(new_ds_w_region_var_names))

no_data = self.first_oc_data_source.make_local('empty_ds',
time_range=(datetime.datetime(2017, 12, 1, 0, 0),
datetime.datetime(2017, 12, 31, 23, 59)))
self.assertIsNone(no_data)

def test_data_store(self):
Expand Down
Loading