Skip to content

Commit

Permalink
import: introduce --store
Browse files Browse the repository at this point in the history
Fixes #4527
  • Loading branch information
charlesbaynham authored and efiop committed Dec 2, 2020
1 parent f50f12a commit 0f8f582
Show file tree
Hide file tree
Showing 14 changed files with 187 additions and 15 deletions.
11 changes: 11 additions & 0 deletions dvc/command/imp.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ def run(self):
rev=self.args.rev,
no_exec=self.args.no_exec,
desc=self.args.desc,
store=self.args.store,
)
except DvcException:
logger.exception(
Expand Down Expand Up @@ -82,4 +83,14 @@ def add_parser(subparsers, parent_parser):
"This doesn't affect any DVC operations."
),
)
import_parser.add_argument(
"-s",
"--store",
action="store_true",
default=False,
help=(
"Enable pushing the imported data to remote storage, and "
"pulling it from there (instead of reimporting)."
),
)
import_parser.set_defaults(func=CmdImport)
6 changes: 4 additions & 2 deletions dvc/dependency/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,14 @@

# NOTE: schema for dependencies is basically the same as for outputs, but
# without output-specific entries like 'cache' (whether or not output is
# cached, see -o and -O flags for `dvc run`) and 'metric' (whether or not
# output is a metrics file and how to parse it, see `-M` flag for `dvc run`).
# cached, see -o and -O flags for `dvc run`), 'metric' (whether or not output
# is a metric file and how to parse it, see `-M` flag for `dvc run`) and
# `store` (whether to push outputs to the remote)
SCHEMA = output.SCHEMA.copy()
del SCHEMA[BaseOutput.PARAM_CACHE]
del SCHEMA[BaseOutput.PARAM_METRIC]
del SCHEMA[BaseOutput.PARAM_DESC]
del SCHEMA[BaseOutput.PARAM_STORE]
SCHEMA.update(RepoDependency.REPO_SCHEMA)
SCHEMA.update(ParamsDependency.PARAM_SCHEMA)

Expand Down
24 changes: 20 additions & 4 deletions dvc/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,16 +265,32 @@ def __init__(self, amount):


class CheckoutError(DvcException):
def __init__(self, target_infos, stats=None):
def __init__(self, target_infos, stats=None, failed_not_stored=None):
self.target_infos = target_infos
self.stats = stats
targets = [str(t) for t in target_infos]
targets = []
for t in target_infos:
if failed_not_stored and t in failed_not_stored:
targets.append(
"{} (marked with 'store: false' in its dvc file)".format(t)
)
else:
targets.append(str(t))

m = (
"Checkout failed for following targets:\n{}\nIs your "
"cache up to date?\n{}".format(
"Checkout failed for following targets:\n{}\n\n"
"Is your cache up to date?\n{}".format(
"\n".join(targets), error_link("missing-files"),
)
)

if failed_not_stored:
m = m + (
"\n\nNote that 'store: false' outputs are not "
"searched for in remote storage, only in the cache. "
"See {}".format(error_link("checkout-no-store"),)
)

super().__init__(m)


Expand Down
3 changes: 3 additions & 0 deletions dvc/hash_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ class HashInfo:
def __bool__(self):
return bool(self.value)

def __str__(self):
return f"{self.name}: {self.value}"

@classmethod
def from_dict(cls, d):
_d = d.copy() if d else {}
Expand Down
9 changes: 9 additions & 0 deletions dvc/output/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
SCHEMA[HashInfo.PARAM_SIZE] = int
SCHEMA[HashInfo.PARAM_NFILES] = int
SCHEMA[BaseOutput.PARAM_DESC] = str
SCHEMA[BaseOutput.PARAM_STORE] = bool


def _get(
Expand All @@ -81,6 +82,7 @@ def _get(
persist=False,
checkpoint=False,
desc=None,
store=True,
):
parsed = urlparse(p)

Expand All @@ -97,6 +99,7 @@ def _get(
persist=persist,
checkpoint=checkpoint,
desc=desc,
store=store,
)

for o in OUTS:
Expand All @@ -112,6 +115,7 @@ def _get(
persist=persist,
checkpoint=checkpoint,
desc=desc,
store=store,
)
return LocalOutput(
stage,
Expand All @@ -124,13 +128,15 @@ def _get(
persist=persist,
checkpoint=checkpoint,
desc=desc,
store=store,
)


def loadd_from(stage, d_list):
ret = []
for d in d_list:
p = d.pop(BaseOutput.PARAM_PATH)
store = d.pop(BaseOutput.PARAM_STORE, not stage.is_repo_import)
cache = d.pop(BaseOutput.PARAM_CACHE, True)
metric = d.pop(BaseOutput.PARAM_METRIC, False)
plot = d.pop(BaseOutput.PARAM_PLOT, False)
Expand All @@ -142,6 +148,7 @@ def loadd_from(stage, d_list):
stage,
p,
info=d,
store=store,
cache=cache,
metric=metric,
plot=plot,
Expand All @@ -161,6 +168,7 @@ def loads_from(
plot=False,
persist=False,
checkpoint=False,
store=True,
):
return [
_get(
Expand All @@ -172,6 +180,7 @@ def loads_from(
plot=plot,
persist=persist,
checkpoint=checkpoint,
store=store,
)
for s in s_list
]
Expand Down
24 changes: 23 additions & 1 deletion dvc/output/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,15 @@ def __init__(self, match):
super().__init__(f"Path '{match.file}' is ignored by\n{lines}")


class OutputStoreCacheIncompatible(DvcException):
def __init__(self, path):
msg = f"""Incompatible options for output '{path}'.
The .dvc file specifies an output with both `cache == False` and
`store == True` which is not possible to satisfy.
"""
super().__init__(msg)


class BaseOutput:
IS_DEPENDENCY = False

Expand All @@ -76,6 +85,7 @@ class BaseOutput:
PARAM_PLOT_HEADER = "header"
PARAM_PERSIST = "persist"
PARAM_DESC = "desc"
PARAM_STORE = "store"

METRIC_SCHEMA = Any(
None,
Expand Down Expand Up @@ -105,6 +115,7 @@ def __init__(
persist=False,
checkpoint=False,
desc=None,
store=True,
):
self._validate_output_path(path, stage)
# This output (and dependency) objects have too many paths/urls
Expand All @@ -128,6 +139,8 @@ def __init__(
self.use_cache = False if self.IS_DEPENDENCY else cache
self.metric = False if self.IS_DEPENDENCY else metric
self.plot = False if self.IS_DEPENDENCY else plot
self.store = True if self.IS_DEPENDENCY else store

self.persist = persist
self.checkpoint = checkpoint
self.desc = desc
Expand Down Expand Up @@ -328,6 +341,9 @@ def dumpd(self):
if self.checkpoint:
ret[self.PARAM_CHECKPOINT] = self.checkpoint

if self.store == self.stage.is_repo_import:
ret[self.PARAM_STORE] = self.store

return ret

def verify_metric(self):
Expand Down Expand Up @@ -484,10 +500,16 @@ def get_used_cache(self, **kwargs):
"""

if not self.use_cache:
if self.store:
raise OutputStoreCacheIncompatible(self)
return NamedCache()

if self.stage.is_repo_import:
if not self.store:
cache = NamedCache()
if not self.stage.is_repo_import:
return cache
# If this out isn't backed up but it is an import, then it's an
# external dependancy and can be retrieved from its source
(dep,) = self.stage.deps
cache.external[dep.repo_pair].add(dep.def_path)
return cache
Expand Down
10 changes: 8 additions & 2 deletions dvc/repo/checkout.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ def checkout(
"deleted": [],
"modified": [],
"failed": [],
"failed_not_stored": [],
}
if not targets:
targets = [None]
Expand Down Expand Up @@ -117,8 +118,13 @@ def checkout(
for key, items in result.items():
stats[key].extend(_fspath_dir(path) for path in items)

if stats.get("failed"):
raise CheckoutError(stats["failed"], stats)
if stats.get("failed") or stats.get("failed_not_stored"):
raise CheckoutError(
stats["failed"] + stats["failed_not_stored"],
stats,
failed_not_stored=stats["failed_not_stored"],
)

del stats["failed"]
del stats["failed_not_stored"]
return stats
4 changes: 3 additions & 1 deletion dvc/repo/imp_url.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ def imp_url(
frozen=True,
no_exec=False,
desc=None,
store=True,
):
from dvc.dvcfile import Dvcfile
from dvc.stage import Stage, create_stage, restore_meta
Expand All @@ -34,14 +35,15 @@ def imp_url(
):
url = relpath(url, wdir)

kwargs = {"outs": [out]} if store else {"outs_no_store": [out]}
stage = create_stage(
Stage,
self,
fname or path,
wdir=wdir,
deps=[url],
outs=[out],
erepo=erepo,
**kwargs,
)
restore_meta(stage)
if stage.can_be_skipped:
Expand Down
9 changes: 5 additions & 4 deletions dvc/stage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,10 +238,9 @@ def is_import(self):

@property
def is_repo_import(self):
if not self.is_import:
return False

return isinstance(self.deps[0], dependency.RepoDependency)
return len(self.deps) == 1 and isinstance(
self.deps[0], dependency.RepoDependency
)

@property
def is_checkpoint(self):
Expand Down Expand Up @@ -540,6 +539,8 @@ def _checkout(out, **kwargs):
return None, []
return "modified" if modified else "added", [out.path_info]
except CheckoutError as exc:
if not out.store:
return "failed_not_stored", exc.target_infos
return "failed", exc.target_infos

@rwlocked(read=["deps", "outs"])
Expand Down
2 changes: 2 additions & 0 deletions dvc/stage/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def fill_stage_outputs(stage, **kwargs):
"outs_no_cache",
"outs",
"checkpoints",
"outs_no_store",
]

stage.outs = []
Expand All @@ -62,6 +63,7 @@ def fill_stage_outputs(stage, **kwargs):
stage,
kwargs.get(key, []),
use_cache="no_cache" not in key,
store="no_store" not in key and "no_cache" not in key,
persist="persist" in key,
metric="metrics" in key,
plot="plots" in key,
Expand Down
1 change: 1 addition & 0 deletions tests/func/test_checkout.py
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,7 @@ def test_checkout_stats_on_failure(tmp_dir, dvc, scm):
assert exc.value.stats == {
**empty_checkout,
"failed": ["foo"],
"failed_not_stored": [],
"modified": ["other"],
}

Expand Down
Loading

0 comments on commit 0f8f582

Please sign in to comment.