diff --git a/dvc/data_cloud.py b/dvc/data_cloud.py index 0fceabb4aa..0b594f182d 100644 --- a/dvc/data_cloud.py +++ b/dvc/data_cloud.py @@ -48,12 +48,7 @@ def _init_remote(self, name): return Remote(self.repo, name=name) def push( - self, - cache, - jobs=None, - remote=None, - show_checksums=False, - run_cache=False, + self, cache, jobs=None, remote=None, show_checksums=False, ): """Push data items in a cloud-agnostic way. @@ -67,20 +62,12 @@ def push( """ remote = self.get_remote(remote, "push") - if run_cache: - self.repo.stage_cache.push(remote) - return self.repo.cache.local.push( cache, jobs=jobs, remote=remote, show_checksums=show_checksums, ) def pull( - self, - cache, - jobs=None, - remote=None, - show_checksums=False, - run_cache=False, + self, cache, jobs=None, remote=None, show_checksums=False, ): """Pull data items in a cloud-agnostic way. @@ -94,9 +81,6 @@ def pull( """ remote = self.get_remote(remote, "pull") - if run_cache: - self.repo.stage_cache.pull(remote) - downloaded_items_num = self.repo.cache.local.pull( cache, jobs=jobs, remote=remote, show_checksums=show_checksums ) diff --git a/dvc/repo/__init__.py b/dvc/repo/__init__.py index 88618cf49e..f90a22e0c9 100644 --- a/dvc/repo/__init__.py +++ b/dvc/repo/__init__.py @@ -248,6 +248,7 @@ def used_cache( force=False, jobs=None, recursive=False, + used_run_cache=None, ): """Get the stages related to the given target and collect the `info` of its outputs. @@ -291,6 +292,12 @@ def used_cache( ) cache.update(used_cache, suffix=suffix) + if used_run_cache: + used_cache = self.stage_cache.get_used_cache( + used_run_cache, remote=remote, force=force, jobs=jobs, + ) + cache.update(used_cache) + return cache def _collect_graph(self, stages): diff --git a/dvc/repo/fetch.py b/dvc/repo/fetch.py index efbd93beba..cf2a01b763 100644 --- a/dvc/repo/fetch.py +++ b/dvc/repo/fetch.py @@ -34,6 +34,9 @@ def _fetch( config.NoRemoteError: thrown when downloading only local files and no remote is configured """ + + used_run_cache = self.stage_cache.pull(remote) if run_cache else [] + used = self.used_cache( targets, all_branches=all_branches, @@ -44,6 +47,7 @@ def _fetch( remote=remote, jobs=jobs, recursive=recursive, + used_run_cache=used_run_cache, ) downloaded = 0 @@ -51,11 +55,7 @@ def _fetch( try: downloaded += self.cloud.pull( - used, - jobs, - remote=remote, - show_checksums=show_checksums, - run_cache=run_cache, + used, jobs, remote=remote, show_checksums=show_checksums, ) except NoRemoteError: if not used.external and used["local"]: diff --git a/dvc/repo/push.py b/dvc/repo/push.py index b98714338b..81e07ce6ac 100644 --- a/dvc/repo/push.py +++ b/dvc/repo/push.py @@ -14,6 +14,8 @@ def push( all_commits=False, run_cache=False, ): + used_run_cache = self.stage_cache.push(remote) if run_cache else [] + used = self.used_cache( targets, all_branches=all_branches, @@ -24,6 +26,7 @@ def push( remote=remote, jobs=jobs, recursive=recursive, + used_run_cache=used_run_cache, ) - return self.cloud.push(used, jobs, remote=remote, run_cache=run_cache) + return self.cloud.push(used, jobs, remote=remote) diff --git a/dvc/stage/cache.py b/dvc/stage/cache.py index af6528f51f..c383d8725e 100644 --- a/dvc/stage/cache.py +++ b/dvc/stage/cache.py @@ -106,9 +106,11 @@ def restore(self, stage): @staticmethod def _transfer(func, from_remote, to_remote): + ret = [] + runs = from_remote.path_info / "runs" if not from_remote.exists(runs): - return + return [] for src in from_remote.walk_files(runs): rel = src.relative_to(from_remote.path_info) @@ -118,9 +120,36 @@ def _transfer(func, from_remote, to_remote): if to_remote.exists(key) and first(to_remote.walk_files(key)): continue func(src, dst) + ret.append((src.parent.name, src.name)) + + return ret def push(self, remote): + remote = self.repo.cloud.get_remote(remote) return self._transfer(remote.upload, self.repo.cache.local, remote) def pull(self, remote): + remote = self.repo.cloud.get_remote(remote) return self._transfer(remote.download, remote, self.repo.cache.local) + + def get_used_cache(self, used_run_cache, *args, **kwargs): + from dvc.cache import NamedCache + from dvc.stage import create_stage, PipelineStage + + cache = NamedCache() + + for key, value in used_run_cache: + entry = self._load_cache(key, value) + if not entry: + continue + stage = create_stage( + PipelineStage, + repo=self.repo, + path="dvc.yaml", + cmd=entry["cmd"], + deps=[dep["path"] for dep in entry["deps"]], + outs=[out["path"] for out in entry["outs"]], + ) + StageLoader.fill_from_lock(stage, entry) + cache.update(stage.get_used_cache(*args, **kwargs)) + return cache