Skip to content

Commit

Permalink
staging: don't use move
Browse files Browse the repository at this point in the history
  • Loading branch information
efiop committed Oct 27, 2021
1 parent 15b1169 commit ffa7650
Show file tree
Hide file tree
Showing 9 changed files with 76 additions and 45 deletions.
51 changes: 35 additions & 16 deletions dvc/fs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,48 @@
logger = logging.getLogger(__name__)


def _link(
from_fs: "BaseFileSystem",
from_info: "AnyPath",
to_fs: "BaseFileSystem",
to_info: "DvcPath",
hardlink: bool = False,
) -> None:
if not isinstance(from_fs, type(to_fs)):
raise OSError(errno.EXDEV, "can't link across filesystems")

links = ["reflink"] + ["hardlink"] if hardlink else []
while links:
link = links.pop(0)
try:
func = getattr(to_fs, link)
except AttributeError:
continue

try:
return func(from_info, to_info)
except RemoteActionNotImplemented:
continue
except OSError as exc:
if exc.errno not in [errno.EXDEV, errno.ENOTSUP]:
raise

raise OSError(errno.ENOTSUP, "reflink and hardlink are not supported")


def transfer(
from_fs: "BaseFileSystem",
from_info: "AnyPath",
to_fs: "BaseFileSystem",
to_info: "DvcPath",
move: bool = False,
hardlink: bool = False,
) -> None:
same_fs = isinstance(from_fs, type(to_fs))
use_move = same_fs and move
try:
if use_move:
return to_fs.move(from_info, to_info)

if same_fs:
try:
return from_fs.reflink(from_info, to_info)
except RemoteActionNotImplemented:
pass
except OSError as exc:
if exc.errno not in [errno.EXDEV, errno.ENOTSUP]:
raise
try:
return _link(from_fs, from_info, to_fs, to_info, hardlink=hardlink)
except OSError as exc:
if exc.errno not in [errno.EXDEV, errno.ENOTSUP]:
raise

if isinstance(from_fs, LocalFileSystem):
if not isinstance(from_info, from_fs.PATH_CLS):
Expand All @@ -61,8 +82,6 @@ def transfer(
and isinstance(exc.__context__, FileExistsError)
):
logger.debug("'%s' file already exists, skipping", to_info)
if use_move:
from_fs.remove(from_info)
return None

raise
12 changes: 8 additions & 4 deletions dvc/objects/db/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,21 +81,25 @@ def _add_file(
from_info: "AnyPath",
to_info: "DvcPath",
_hash_info: "HashInfo",
move: bool = False,
hardlink: bool = False,
):
from dvc import fs

self.makedirs(to_info.parent)
return fs.utils.transfer(
from_fs, from_info, self.fs, to_info, move=move
from_fs,
from_info,
self.fs,
to_info,
hardlink=hardlink,
)

def add(
self,
path_info: "AnyPath",
fs: "BaseFileSystem",
hash_info: "HashInfo",
move: bool = True,
hardlink: bool = False,
verify: Optional[bool] = None,
):
if self.read_only:
Expand All @@ -110,7 +114,7 @@ def add(
pass

cache_info = self.hash_to_path_info(hash_info.value)
self._add_file(fs, path_info, cache_info, hash_info, move=move)
self._add_file(fs, path_info, cache_info, hash_info, hardlink=hardlink)

try:
if verify:
Expand Down
8 changes: 6 additions & 2 deletions dvc/objects/db/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,16 @@ def _add_file(
from_info: "AnyPath",
to_info: "DvcPath",
hash_info: "HashInfo",
move: bool = False,
hardlink: bool = False,
):
self.makedirs(to_info.parent)
if hash_info.isdir:
return super()._add_file(
from_fs, from_info, to_info, hash_info, move
from_fs,
from_info,
to_info,
hash_info,
hardlink=hardlink,
)
ref_file = ReferenceHashFile(from_info, from_fs, hash_info)
self._obj_cache[hash_info] = ref_file
Expand Down
4 changes: 2 additions & 2 deletions dvc/objects/stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def _stage_file(path_info, fs, name, odb=None, upload_odb=None, dry_run=False):
if dry_run:
obj = HashFile(path_info, fs, hash_info)
else:
odb.add(path_info, fs, hash_info, move=False)
odb.add(path_info, fs, hash_info, hardlink=False)
obj = odb.get(hash_info)

return path_info, meta, obj
Expand Down Expand Up @@ -175,7 +175,7 @@ def _stage_tree(path_info, fs, fs_info, name, odb=None, **kwargs):
path_info, fs
)
tree.digest(hash_info=hash_info)
odb.add(tree.path_info, tree.fs, tree.hash_info, move=True)
odb.add(tree.path_info, tree.fs, tree.hash_info, hardlink=False)
raw = odb.get(tree.hash_info)
# cleanup unneeded memfs tmpfile and return tree based on the
# ODB fs/path
Expand Down
8 changes: 6 additions & 2 deletions dvc/objects/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ def transfer(
obj_ids: Iterable["HashInfo"],
jobs: Optional[int] = None,
verify: bool = False,
move: bool = False,
hardlink: bool = False,
**kwargs,
) -> int:
"""Transfer (copy) the specified objects from one ODB to another.
Expand All @@ -157,7 +157,11 @@ def transfer(
def func(hash_info: "HashInfo") -> None:
obj = src.get(hash_info)
return dest.add(
obj.path_info, obj.fs, obj.hash_info, verify=verify, move=move
obj.path_info,
obj.fs,
obj.hash_info,
verify=verify,
hardlink=hardlink,
)

total = len(status.new)
Expand Down
6 changes: 3 additions & 3 deletions dvc/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,7 @@ def commit(self, filter_info=None):
self.odb,
{obj.hash_info},
shallow=False,
move=True,
hardlink=True,
)
checkout(
filter_info or self.path_info,
Expand Down Expand Up @@ -629,7 +629,7 @@ def _commit_granular_dir(self, filter_info):
self.odb,
{save_obj.hash_info} | {oid for _, _, oid in save_obj},
shallow=True,
move=True,
hardlink=True,
)
return checkout_obj

Expand Down Expand Up @@ -826,7 +826,7 @@ def transfer(
odb,
{obj.hash_info},
jobs=jobs,
move=upload,
hardlink=False,
shallow=False,
)

Expand Down
18 changes: 9 additions & 9 deletions tests/func/test_add.py
Original file line number Diff line number Diff line change
Expand Up @@ -496,27 +496,27 @@ def test_should_update_state_entry_for_directory_after_add(

ret = main(["add", "data"])
assert ret == 0
assert file_md5_counter.mock.call_count == 4
assert file_md5_counter.mock.call_count == 3

ret = main(["status"])
assert ret == 0
assert file_md5_counter.mock.call_count == 4
assert file_md5_counter.mock.call_count == 3

ls = "dir" if os.name == "nt" else "ls"
ret = main(
["run", "--single-stage", "-d", "data", "{} {}".format(ls, "data")]
)
assert ret == 0
assert file_md5_counter.mock.call_count == 4
assert file_md5_counter.mock.call_count == 3

os.rename("data", "data" + ".back")
ret = main(["checkout"])
assert ret == 0
assert file_md5_counter.mock.call_count == 4
assert file_md5_counter.mock.call_count == 3

ret = main(["status"])
assert ret == 0
assert file_md5_counter.mock.call_count == 4
assert file_md5_counter.mock.call_count == 3


class TestAddCommit(TestDvc):
Expand All @@ -537,15 +537,15 @@ def test_should_collect_dir_cache_only_once(mocker, tmp_dir, dvc):
counter = mocker.spy(dvc_module.objects.stage, "_stage_tree")
ret = main(["add", "data"])
assert ret == 0
assert counter.mock.call_count == 2
assert counter.mock.call_count == 1

ret = main(["status"])
assert ret == 0
assert counter.mock.call_count == 2
assert counter.mock.call_count == 1

ret = main(["status"])
assert ret == 0
assert counter.mock.call_count == 2
assert counter.mock.call_count == 1


class TestShouldPlaceStageInDataDirIfRepositoryBelowSymlink(TestDvc):
Expand Down Expand Up @@ -964,7 +964,7 @@ def test_add_with_cache_link_error(tmp_dir, dvc, mocker, capsys):
err = capsys.readouterr()[1]
assert "reconfigure cache types" in err

assert not (tmp_dir / "foo").exists()
assert (tmp_dir / "foo").exists()
assert (tmp_dir / "foo.dvc").exists()
assert (tmp_dir / ".dvc" / "cache").read_text() == {
"ac": {"bd18db4cc2f85cedef654fccc4a4d8": "foo"}
Expand Down
2 changes: 1 addition & 1 deletion tests/func/test_external_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ def test_subrepos_are_ignored(tmp_dir, erepo_dir):
repo.odb.local,
{obj.hash_info},
shallow=False,
move=True,
hardlink=True,
)
assert set(cache_dir.glob("??/*")) == {
cache_dir / "e1" / "d9e8eae5374860ae025ec84cfd85c7.dir",
Expand Down
12 changes: 6 additions & 6 deletions tests/unit/objects/db/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,9 @@ def test_staging_file(tmp_dir, dvc):
check(local_odb, obj)
check(staging_odb, obj)

transfer(staging_odb, local_odb, {obj.hash_info}, move=True)
transfer(staging_odb, local_odb, {obj.hash_info}, hardlink=True)
check(local_odb, obj)
with pytest.raises(FileNotFoundError):
check(staging_odb, obj)
check(staging_odb, obj)

path_info = local_odb.hash_to_path_info(obj.hash_info.value)
assert fs.exists(path_info)
Expand All @@ -130,10 +129,11 @@ def test_staging_dir(tmp_dir, dvc):
check(local_odb, obj)
check(staging_odb, obj)

transfer(staging_odb, local_odb, {obj.hash_info}, shallow=False, move=True)
transfer(
staging_odb, local_odb, {obj.hash_info}, shallow=False, hardlink=True
)
check(local_odb, obj)
with pytest.raises(FileNotFoundError):
check(staging_odb, obj)
check(staging_odb, obj)

path_info = local_odb.hash_to_path_info(obj.hash_info.value)
assert fs.exists(path_info)

0 comments on commit ffa7650

Please sign in to comment.