diff --git a/dvc/api.py b/dvc/api.py index 506baa6c82..1f5ae52851 100644 --- a/dvc/api.py +++ b/dvc/api.py @@ -1,6 +1,5 @@ from contextlib import contextmanager import os -import tempfile try: from contextlib import _GeneratorContextManager as GCM @@ -9,7 +8,7 @@ from dvc.utils.compat import urlparse from dvc.repo import Repo -from dvc.external_repo import ExternalRepo +from dvc.external_repo import external_repo def get_url(path, repo=None, rev=None, remote=None): @@ -68,10 +67,5 @@ def _make_repo(repo_url, rev=None): assert rev is None, "Custom revision is not supported for local repo" yield Repo(repo_url) else: - tmp_dir = tempfile.mkdtemp("dvc-repo") - ext_repo = ExternalRepo(tmp_dir, url=repo_url, rev=rev) - try: - ext_repo.install() - yield ext_repo.repo - finally: - ext_repo.uninstall() + with external_repo(url=repo_url, rev=rev) as repo: + yield repo diff --git a/dvc/cli.py b/dvc/cli.py index 7def718bcf..f42f949a02 100644 --- a/dvc/cli.py +++ b/dvc/cli.py @@ -36,6 +36,7 @@ import dvc.command.tag as tag import dvc.command.diff as diff import dvc.command.version as version +import dvc.command.update as update from dvc.exceptions import DvcParserError @@ -70,6 +71,7 @@ tag, diff, version, + update, ] diff --git a/dvc/command/update.py b/dvc/command/update.py new file mode 100644 index 0000000000..a026ca9e7b --- /dev/null +++ b/dvc/command/update.py @@ -0,0 +1,37 @@ +from __future__ import unicode_literals + +import argparse +import logging + +from dvc.exceptions import DvcException +from dvc.command.base import CmdBase, append_doc_link + + +logger = logging.getLogger(__name__) + + +class CmdUpdate(CmdBase): + def run(self): + ret = 0 + for target in self.args.targets: + try: + self.repo.update(target) + except DvcException: + logger.exception("failed to update '{}'.".format(target)) + ret = 1 + return ret + + +def add_parser(subparsers, parent_parser): + UPDATE_HELP = "Update dependencies and reproduce specified DVC-files." + update_parser = subparsers.add_parser( + "update", + parents=[parent_parser], + description=append_doc_link(UPDATE_HELP, "update"), + help=UPDATE_HELP, + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + update_parser.add_argument( + "targets", nargs="+", help="DVC-files to update." + ) + update_parser.set_defaults(func=CmdUpdate) diff --git a/dvc/dependency/__init__.py b/dvc/dependency/__init__.py index 52a451c9ff..bd3dd1587e 100644 --- a/dvc/dependency/__init__.py +++ b/dvc/dependency/__init__.py @@ -16,7 +16,6 @@ from .repo import DependencyREPO from dvc.remote import Remote -from dvc.external_repo import ExternalRepo DEPS = [ @@ -47,7 +46,7 @@ SCHEMA = output.SCHEMA.copy() del SCHEMA[schema.Optional(OutputBase.PARAM_CACHE)] del SCHEMA[schema.Optional(OutputBase.PARAM_METRIC)] -SCHEMA[schema.Optional(DependencyREPO.PARAM_REPO)] = ExternalRepo.SCHEMA +SCHEMA[schema.Optional(DependencyREPO.PARAM_REPO)] = DependencyREPO.REPO_SCHEMA def _get(stage, p, info): diff --git a/dvc/dependency/base.py b/dvc/dependency/base.py index 2859ee496d..ecd4a3b5d5 100644 --- a/dvc/dependency/base.py +++ b/dvc/dependency/base.py @@ -20,3 +20,6 @@ class DependencyBase(object): DoesNotExistError = DependencyDoesNotExistError IsNotFileOrDirError = DependencyIsNotFileOrDirError + + def update(self): + raise NotImplementedError diff --git a/dvc/dependency/repo.py b/dvc/dependency/repo.py index 040f919d4e..f70f224067 100644 --- a/dvc/dependency/repo.py +++ b/dvc/dependency/repo.py @@ -1,43 +1,77 @@ from __future__ import unicode_literals -import os import copy -from dvc.utils.compat import urlparse -from dvc.external_repo import ExternalRepo +from funcy import merge +from schema import Optional +from contextlib import contextmanager + +from dvc.external_repo import external_repo +from dvc.utils.compat import str from .local import DependencyLOCAL class DependencyREPO(DependencyLOCAL): PARAM_REPO = "repo" + PARAM_URL = "url" + PARAM_REV = "rev" + PARAM_REV_LOCK = "rev_lock" - def __init__(self, erepo, stage, *args, **kwargs): - self.erepo = ExternalRepo(stage.repo.dvc_dir, **erepo) - super(DependencyLOCAL, self).__init__(stage, *args, **kwargs) - - def _parse_path(self, remote, path): - self.erepo.install(self.repo.cache.local.cache_dir) + REPO_SCHEMA = { + Optional(PARAM_URL): str, + Optional(PARAM_REV): str, + Optional(PARAM_REV_LOCK): str, + } - out_path = os.path.join( - self.erepo.repo.root_dir, urlparse(path).path.lstrip("/") - ) + def __init__(self, def_repo, stage, *args, **kwargs): + self.def_repo = def_repo + super(DependencyREPO, self).__init__(stage, *args, **kwargs) - out, = self.erepo.repo.find_outs_by_path(out_path) - self.info = copy.copy(out.info) - self._erepo_stage = copy.copy(out.stage.path) - return self.REMOTE.path_cls(out.cache_path) + def _parse_path(self, remote, path): + return None @property def is_in_repo(self): return False + def __str__(self): + return "{} ({})".format(self.def_path, self.def_repo[self.PARAM_URL]) + + @contextmanager + def _make_repo(self, **overrides): + with external_repo(**merge(self.def_repo, overrides)) as repo: + yield repo + + def status(self): + with self._make_repo() as repo: + current = repo.find_out_by_relpath(self.def_path).info + + with self._make_repo(rev_lock=None) as repo: + updated = repo.find_out_by_relpath(self.def_path).info + + if current != updated: + return {str(self): "update available"} + + return {} + + def save(self): + pass + def dumpd(self): - ret = super(DependencyLOCAL, self).dumpd() - ret[self.PARAM_REPO] = self.erepo.dumpd() - return ret + return {self.PARAM_PATH: self.def_path, self.PARAM_REPO: self.def_repo} def download(self, to, resume=False): - self.erepo.repo.fetch(self._erepo_stage) - to.info = copy.copy(self.info) - to.checkout() + with self._make_repo( + cache_dir=self.repo.cache.local.cache_dir + ) as repo: + self.def_repo[self.PARAM_REV_LOCK] = repo.scm.get_rev() + + out = repo.find_out_by_relpath(self.def_path) + repo.fetch(out.stage.path) + to.info = copy.copy(out.info) + to.checkout() + + def update(self): + with self._make_repo(rev_lock=None) as repo: + self.def_repo[self.PARAM_REV_LOCK] = repo.scm.get_rev() diff --git a/dvc/external_repo.py b/dvc/external_repo.py index 2bc2668f9d..20a4db4c84 100644 --- a/dvc/external_repo.py +++ b/dvc/external_repo.py @@ -1,17 +1,15 @@ from __future__ import unicode_literals import os -import shutil import logging -import shortuuid +import tempfile -from funcy import cached_property, retry -from schema import Optional +from funcy import retry +from contextlib import contextmanager from dvc.config import Config from dvc.cache import CacheConfig from dvc.exceptions import DvcException -from dvc.utils.compat import makedirs, str from dvc.utils import remove @@ -22,18 +20,10 @@ class ExternalRepoError(DvcException): pass -class NotInstalledError(ExternalRepoError): - def __init__(self, name): - super(NotInstalledError, self).__init__( - "Repo '{}' is not installed".format(name) - ) - - -class InstallError(ExternalRepoError): +class CloneError(ExternalRepoError): def __init__(self, url, path, cause): - super(InstallError, self).__init__( - "Failed to install repo '{}' to '{}'".format(url, path), - cause=cause, + super(CloneError, self).__init__( + "Failed to clone repo '{}' to '{}'".format(url, path), cause=cause ) @@ -45,117 +35,49 @@ def __init__(self, url, rev, cause): ) -class ExternalRepo(object): - REPOS_DIR = "repos" - - PARAM_URL = "url" - PARAM_VERSION = "rev" +def _clone(url=None, rev=None, rev_lock=None, cache_dir=None): + import git + from dvc.repo import Repo - SCHEMA = {Optional(PARAM_URL): str, Optional(PARAM_VERSION): str} + _path = tempfile.mkdtemp("dvc-repo") - def __init__(self, dvc_dir, **kwargs): - self.repos_dir = os.path.join(dvc_dir, self.REPOS_DIR) - self.url = kwargs[self.PARAM_URL] + try: + repo = git.Repo.clone_from(url, _path, no_single_branch=True) + except git.exc.GitCommandError as exc: + raise CloneError(url, _path, exc) - self.name = "{}-{}".format(os.path.basename(self.url), hash(self.url)) - - self.rev = kwargs.get(self.PARAM_VERSION) - self.path = os.path.join(self.repos_dir, self.name) + try: + revision = rev_lock or rev + if revision: + try: + repo.git.checkout(revision) + except git.exc.GitCommandError as exc: + raise RevError(url, revision, exc) + finally: + repo.close() - @cached_property - def repo(self): - from dvc.repo import Repo + if cache_dir: + repo = Repo(_path) + cache_config = CacheConfig(repo.config) + cache_config.set_dir(cache_dir, level=Config.LEVEL_LOCAL) + repo.scm.git.close() - if not self.installed: - raise NotInstalledError(self.name) + return Repo(_path) - return Repo(self.path, rev=self.rev) - @property - def installed(self): - return os.path.exists(self.path) +def _remove(repo): + repo.scm.git.close() - def _install_to(self, tmp_dir, cache_dir): - import git + if os.name == "nt": + # git.exe may hang for a while not permitting to remove temp dir + os_retry = retry(5, errors=OSError, timeout=0.1) + os_retry(remove)(repo.root_dir) + else: + remove(repo.root_dir) - try: - git.Repo.clone_from( - self.url, tmp_dir, depth=1, no_single_branch=True - ) - except git.exc.GitCommandError as exc: - raise InstallError(self.url, tmp_dir, exc) - if self.rev: - try: - repo = git.Repo(tmp_dir) - repo.git.checkout(self.rev) - repo.close() - except git.exc.GitCommandError as exc: - raise RevError(self.url, self.rev, exc) - - if cache_dir: - from dvc.repo import Repo - - repo = Repo(tmp_dir) - cache_config = CacheConfig(repo.config) - cache_config.set_dir(cache_dir, level=Config.LEVEL_LOCAL) - repo.scm.git.close() - - def install(self, cache_dir=None, force=False): - if self.installed and not force: - logger.info( - "Skipping installing '{}'('{}') as it is already " - "installed.".format(self.name, self.url) - ) - return - - makedirs(self.repos_dir, exist_ok=True) - - # installing package to a temporary directory until we are sure that - # it has been installed correctly. - # - # Note that we can't use tempfile.TemporaryDirectory is using symlinks - # to tmpfs, so we won't be able to use move properly. - tmp_dir = os.path.join(self.repos_dir, "." + str(shortuuid.uuid())) - try: - self._install_to(tmp_dir, cache_dir) - except ExternalRepoError: - if os.path.exists(tmp_dir): - remove(tmp_dir) - raise - - if self.installed: - self.uninstall() - - shutil.move(tmp_dir, self.path) - - def uninstall(self): - if not self.installed: - logger.info( - "Skipping uninstalling '{}' as it is not installed.".format( - self.name - ) - ) - return - - # If repo has been initialized then we need to close its git repo - if "repo" in self.__dict__: - self.repo.scm.git.close() - - if os.name == "nt": - # git.exe may hang for a while not permitting to remove temp dir - os_retry = retry(5, errors=OSError, timeout=0.1) - os_retry(remove)(self.path) - else: - remove(self.path) - - def update(self): - self.repo.scm.fetch(self.rev) - - def dumpd(self): - ret = {self.PARAM_URL: self.url} - - if self.rev: - ret[self.PARAM_VERSION] = self.rev - - return ret +@contextmanager +def external_repo(**kwargs): + repo = _clone(**kwargs) + yield repo + _remove(repo) diff --git a/dvc/repo/__init__.py b/dvc/repo/__init__.py index f4ac80595e..fc90f0ed71 100644 --- a/dvc/repo/__init__.py +++ b/dvc/repo/__init__.py @@ -44,8 +44,9 @@ class Repo(object): from dvc.repo.brancher import brancher from dvc.repo.get import get from dvc.repo.get_url import get_url + from dvc.repo.update import update - def __init__(self, root_dir=None, rev=None): + def __init__(self, root_dir=None): from dvc.state import State from dvc.lock import Lock from dvc.scm import SCM @@ -64,10 +65,7 @@ def __init__(self, root_dir=None, rev=None): self.scm = SCM(self.root_dir, repo=self) - if rev: - self.tree = self.scm.get_tree(rev) - else: - self.tree = WorkingTree(self.root_dir) + self.tree = WorkingTree(self.root_dir) self.lock = Lock(self.dvc_dir) # NOTE: storing state and link_state in the repository itself to avoid @@ -124,7 +122,6 @@ def unprotect(self, target): def _ignore(self): from dvc.updater import Updater - from dvc.external_repo import ExternalRepo updater = Updater(self.dvc_dir) @@ -134,7 +131,6 @@ def _ignore(self): self.config.config_local_file, updater.updater_file, updater.lock.lock_file, - os.path.join(self.dvc_dir, ExternalRepo.REPOS_DIR), ] + self.state.temp_files if self.cache.local.cache_dir.startswith(self.root_dir): @@ -330,6 +326,9 @@ def graph(self, stages=None, from_directory=None): G_active.add_node(node, stage=stage) for dep in stage.deps: + if dep.path_info is None: + continue + for out in outs: if ( out == dep.path_info @@ -441,6 +440,11 @@ def func(out): return matched + def find_out_by_relpath(self, relpath): + path = os.path.join(self.root_dir, relpath) + out, = self.find_outs_by_path(path) + return out + def is_dvc_internal(self, path): path_parts = os.path.normpath(path).split(os.path.sep) return self.DVC_DIR in path_parts diff --git a/dvc/repo/get.py b/dvc/repo/get.py index 0871b40232..a1e14326b2 100644 --- a/dvc/repo/get.py +++ b/dvc/repo/get.py @@ -3,7 +3,8 @@ from dvc.config import Config from dvc.path_info import PathInfo -from dvc.external_repo import ExternalRepo +from dvc.external_repo import external_repo +from dvc.utils import remove from dvc.utils.compat import urlparse @@ -18,28 +19,26 @@ def get(url, path, out=None, rev=None): # and won't work with reflink/hardlink. dpath = os.path.dirname(os.path.abspath(out)) tmp_dir = os.path.join(dpath, "." + str(shortuuid.uuid())) - erepo = ExternalRepo(tmp_dir, url=url, rev=rev) try: - erepo.install() - # Try any links possible to avoid data duplication. - # - # Not using symlink, because we need to remove cache after we are - # done, and to make that work we would have to copy data over - # anyway before removing the cache, so we might just copy it - # right away. - # - # Also, we can't use theoretical "move" link type here, because - # the same cache file might be used a few times in a directory. - erepo.repo.config.set( - Config.SECTION_CACHE, - Config.SECTION_CACHE_TYPE, - "reflink,hardlink,copy", - ) - src = os.path.join(erepo.path, urlparse(path).path.lstrip("/")) - o, = erepo.repo.find_outs_by_path(src) - erepo.repo.fetch(o.stage.path) - o.path_info = PathInfo(os.path.abspath(out)) - with o.repo.state: - o.checkout() + with external_repo(cache_dir=tmp_dir, url=url, rev=rev) as repo: + # Try any links possible to avoid data duplication. + # + # Not using symlink, because we need to remove cache after we are + # done, and to make that work we would have to copy data over + # anyway before removing the cache, so we might just copy it + # right away. + # + # Also, we can't use theoretical "move" link type here, because + # the same cache file might be used a few times in a directory. + repo.config.set( + Config.SECTION_CACHE, + Config.SECTION_CACHE_TYPE, + "reflink,hardlink,copy", + ) + o = repo.find_out_by_relpath(path) + repo.fetch(o.stage.path) + o.path_info = PathInfo(os.path.abspath(out)) + with o.repo.state: + o.checkout() finally: - erepo.uninstall() + remove(tmp_dir) diff --git a/dvc/repo/imp.py b/dvc/repo/imp.py index d0fcd716d4..68ead389b6 100644 --- a/dvc/repo/imp.py +++ b/dvc/repo/imp.py @@ -1,9 +1,6 @@ -from dvc.external_repo import ExternalRepo - - def imp(self, url, path, out=None, rev=None): - erepo = {ExternalRepo.PARAM_URL: url} + erepo = {"url": url} if rev is not None: - erepo[ExternalRepo.PARAM_VERSION] = rev + erepo["rev"] = rev - return self.imp_url(path, out=out, erepo=erepo) + return self.imp_url(path, out=out, erepo=erepo, locked=True) diff --git a/dvc/repo/imp_url.py b/dvc/repo/imp_url.py index e5c6d947c1..2b433b26e6 100644 --- a/dvc/repo/imp_url.py +++ b/dvc/repo/imp_url.py @@ -5,7 +5,9 @@ @scm_context -def imp_url(self, url, out=None, resume=False, fname=None, erepo=None): +def imp_url( + self, url, out=None, resume=False, fname=None, erepo=None, locked=False +): from dvc.stage import Stage default_out = os.path.basename(urlparse(url).path) @@ -29,6 +31,8 @@ def imp_url(self, url, out=None, resume=False, fname=None, erepo=None): with self.state: stage.run(resume=resume) + stage.locked = locked + stage.dump() return stage diff --git a/dvc/repo/update.py b/dvc/repo/update.py new file mode 100644 index 0000000000..1eb352c21e --- /dev/null +++ b/dvc/repo/update.py @@ -0,0 +1,9 @@ +def update(self, target): + from dvc.stage import Stage + + stage = Stage.load(self, target) + with self.state: + stage.update() + stage.save() + + stage.dump() diff --git a/dvc/scm/git/__init__.py b/dvc/scm/git/__init__.py index ed70e6e308..339e643392 100644 --- a/dvc/scm/git/__init__.py +++ b/dvc/scm/git/__init__.py @@ -319,3 +319,6 @@ def get_diff_trees(self, a_ref, b_ref=None): diff_dct[DIFF_A_TREE] = trees[DIFF_A_TREE] diff_dct[DIFF_B_TREE] = trees[DIFF_B_TREE] return diff_dct + + def get_rev(self): + return self.git.git.rev_parse("HEAD") diff --git a/dvc/stage.py b/dvc/stage.py index c88d5b9784..bb3229ec05 100644 --- a/dvc/stage.py +++ b/dvc/stage.py @@ -94,6 +94,14 @@ class StageCommitError(DvcException): pass +class StageUpdateError(DvcException): + def __init__(self, path): + super(StageUpdateError, self).__init__( + "update is not supported for '{}' that is not an " + "import.".format(path) + ) + + class MissingDep(DvcException): def __init__(self, deps): assert len(deps) > 0 @@ -326,6 +334,18 @@ def reproduce( return self + def update(self): + if not self.is_repo_import: + raise StageUpdateError(self.relpath) + + self.deps[0].update() + locked = self.locked + self.locked = False + try: + self.reproduce() + finally: + self.locked = locked + @staticmethod def validate(d, fname=None): from dvc.utils import convert_to_unicode diff --git a/scripts/completion/dvc.bash b/scripts/completion/dvc.bash index 7457371b53..18c64b6abd 100644 --- a/scripts/completion/dvc.bash +++ b/scripts/completion/dvc.bash @@ -10,7 +10,7 @@ _dvc_commands='add cache checkout commit config destroy diff fetch get-url get gc \ import-url import init install lock metrics move pipeline pull push \ - remote remove repro root run status unlock unprotect version' + remote remove repro root run status unlock unprotect version update' _dvc_options='-h --help -V --version' _dvc_global_options='-h --help -q --quiet -v --verbose' @@ -45,6 +45,7 @@ _dvc_status='--show-checksums -j --jobs -r --remote -a --all-branches -T --all-t _dvc_unlock='$(compgen -G *.dvc)' _dvc_unprotect='$(compgen -G *)' _dvc_version='' +_dvc_update='$(compgen -G *.dvc)' # Notes: # diff --git a/scripts/completion/dvc.zsh b/scripts/completion/dvc.zsh index 7a0dc702b7..06b961e740 100644 --- a/scripts/completion/dvc.zsh +++ b/scripts/completion/dvc.zsh @@ -42,6 +42,7 @@ _dvc_commands() { "unlock:Unlock DVC-file." "unprotect:Unprotect data file/directory." "version:Show DVC version and system/environment informaion." + "update:Update dependencies and reproduce specified DVC-files." ) _describe 'dvc commands' _commands @@ -261,6 +262,10 @@ _dvc_unprotect=( "*:Data files:_files" ) +_dvc_update=( + "*:Stages:_files -g '(*.dvc|Dvcfile)'" +) + typeset -A opt_args local context state line curcontext="$curcontext" @@ -299,4 +304,5 @@ case $words[1] in status) _arguments $_dvc_global_options $_dvc_status ;; unlock) _arguments $_dvc_global_options $_dvc_unlock ;; unprotect) _arguments $_dvc_global_options $_dvc_unprotect ;; + update) _arguments $_dvc_global_options $_dvc_update ;; esac diff --git a/tests/func/test_update.py b/tests/func/test_update.py new file mode 100644 index 0000000000..9234b0ed43 --- /dev/null +++ b/tests/func/test_update.py @@ -0,0 +1,42 @@ +import os + +from dvc.repo import Repo + + +def test_update_import(dvc_repo, erepo): + src = "version" + dst = src + + stage = dvc_repo.imp(erepo.root_dir, src, dst, rev="branch") + + assert os.path.exists(dst) + assert os.path.isfile(dst) + with open(dst, "r+") as fobj: + assert fobj.read() == "branch" + + # update data + repo = Repo(erepo.root_dir) + + saved_dir = os.getcwd() + os.chdir(erepo.root_dir) + + repo.scm.checkout("branch") + os.unlink("version") + erepo.create("version", "updated") + repo.add("version") + repo.scm.add([".gitignore", "version.dvc"]) + repo.scm.commit("updated") + repo.scm.checkout("master") + + repo.scm.git.close() + + os.chdir(saved_dir) + + assert dvc_repo.status(stage.path) == {} + dvc_repo.update(stage.path) + assert dvc_repo.status(stage.path) == {} + + assert os.path.exists(dst) + assert os.path.isfile(dst) + with open(dst, "r+") as fobj: + assert fobj.read() == "updated" diff --git a/tests/unit/command/test_update.py b/tests/unit/command/test_update.py new file mode 100644 index 0000000000..bd741c825a --- /dev/null +++ b/tests/unit/command/test_update.py @@ -0,0 +1,15 @@ +from dvc.cli import parse_args +from dvc.command.update import CmdUpdate + + +def test_update(mocker): + targets = ["target1", "target2", "target3"] + cli_args = parse_args(["update"] + targets) + assert cli_args.func == CmdUpdate + cmd = cli_args.func(cli_args) + m = mocker.patch("dvc.repo.Repo.update") + + assert cmd.run() == 0 + + calls = [mocker.call(target) for target in targets] + m.assert_has_calls(calls) diff --git a/tests/unit/test_stage.py b/tests/unit/test_stage.py index 10d6b52f2a..c39a7f5676 100644 --- a/tests/unit/test_stage.py +++ b/tests/unit/test_stage.py @@ -1,5 +1,6 @@ from dvc.path_info import PathInfo -from dvc.stage import Stage +from dvc.stage import Stage, StageUpdateError +from dvc.dependency.repo import DependencyREPO import mock import pytest @@ -70,3 +71,22 @@ def test_stage_fname(add): out.path_info = PathInfo("path/to/out.txt") fname = Stage._stage_fname([out], add) assert fname == "out.txt.dvc" + + +def test_stage_update(mocker): + dep = DependencyREPO({"url": "example.com"}, None, None) + mocker.patch.object(dep, "update", return_value=None) + + stage = Stage(None, "path", deps=[dep]) + reproduce = mocker.patch.object(stage, "reproduce") + is_repo_import = mocker.patch( + __name__ + ".Stage.is_repo_import", new_callable=mocker.PropertyMock + ) + + is_repo_import.return_value = True + stage.update() + assert reproduce.called_once_with() + + is_repo_import.return_value = False + with pytest.raises(StageUpdateError): + stage.update()