Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transfer: preserve workspace data #6860

Merged
merged 5 commits into from
Oct 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion dvc/fs/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@ def exists(self, path_info) -> bool:
return os.path.lexists(path_info)

def checksum(self, path_info) -> str:
return self.fs.checksum(path_info)
from fsspec.utils import tokenize

st = os.stat(path_info)

return str(int(tokenize([st.st_ino, st.st_mtime, st.st_size]), 16))

def isfile(self, path_info) -> bool:
return os.path.isfile(path_info)
Expand Down
51 changes: 35 additions & 16 deletions dvc/fs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,48 @@
logger = logging.getLogger(__name__)


def _link(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will unify this with the stuff we do in checkout in the followups, since it is out of scope.

from_fs: "BaseFileSystem",
from_info: "AnyPath",
to_fs: "BaseFileSystem",
to_info: "DvcPath",
hardlink: bool = False,
) -> None:
if not isinstance(from_fs, type(to_fs)):
raise OSError(errno.EXDEV, "can't link across filesystems")

links = ["reflink"] + ["hardlink"] if hardlink else []
while links:
link = links.pop(0)
try:
func = getattr(to_fs, link)
except AttributeError:
continue

try:
return func(from_info, to_info)
except RemoteActionNotImplemented:
continue
except OSError as exc:
if exc.errno not in [errno.EXDEV, errno.ENOTSUP]:
raise

raise OSError(errno.ENOTSUP, "reflink and hardlink are not supported")


def transfer(
from_fs: "BaseFileSystem",
from_info: "AnyPath",
to_fs: "BaseFileSystem",
to_info: "DvcPath",
move: bool = False,
hardlink: bool = False,
) -> None:
same_fs = isinstance(from_fs, type(to_fs))
use_move = same_fs and move
try:
if use_move:
return to_fs.move(from_info, to_info)

if same_fs:
try:
return from_fs.reflink(from_info, to_info)
except RemoteActionNotImplemented:
pass
except OSError as exc:
if exc.errno not in [errno.EXDEV, errno.ENOTSUP]:
raise
try:
return _link(from_fs, from_info, to_fs, to_info, hardlink=hardlink)
except OSError as exc:
if exc.errno not in [errno.EXDEV, errno.ENOTSUP]:
raise

if isinstance(from_fs, LocalFileSystem):
if not isinstance(from_info, from_fs.PATH_CLS):
Expand All @@ -61,8 +82,6 @@ def transfer(
and isinstance(exc.__context__, FileExistsError)
):
logger.debug("'%s' file already exists, skipping", to_info)
if use_move:
from_fs.remove(from_info)
return None

raise
42 changes: 33 additions & 9 deletions dvc/objects/checkout.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,13 @@ def _link(cache, from_info, to_info):
raise CheckoutError([str(to_info)]) from exc


def _cache_is_copy(cache, path_info):
def _confirm_cache_type(cache, path_info):
"""Checks whether cache uses copies."""
if cache.cache_type_confirmed:
return cache.cache_types[0] == "copy"
return

if set(cache.cache_types) <= {"copy"}:
return True
return

workspace_file = path_info.with_name("." + uuid())
test_cache_file = cache.path_info / ".cache_type_test_file"
Expand All @@ -118,7 +118,16 @@ def _cache_is_copy(cache, path_info):
cache.fs.remove(test_cache_file)

cache.cache_type_confirmed = True
return cache.cache_types[0] == "copy"


def _relink(cache, cache_info, fs, path_info, in_cache, force):
_remove(path_info, fs, in_cache, force=force)
Copy link
Contributor Author

@efiop efiop Oct 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another thing we could do here is move original file aside (e.g. .data.back) so that even if _link fails - we could bring it back. Clearly there is a storage cost for it though. Though we don't relink copies anyway, so it should actually be fine. Though this should be done carefully. Will need to look into it after this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another thing we could do here is move original file aside (e.g. .data.back) so that even if _link fails - we could bring it back. Clearly there is a storage cost for it though. Though we don't relink copies anyway, so it should actually be fine. Though this should be done carefully. Will need to look into it after this PR.

But here we only create a link and that would not cost too much storage. Even in the worst condition the overall storage cost would only be a little more than before the operation.

Btw,

def _do_link(cache, from_info, to_info, link_method):
    if cache.fs.exists(to_info):
        cache.fs.remove(to_info)  # broken symlink
    link_method(from_info, to_info)
    logger.debug(
        "Created '%s': %s -> %s", cache.cache_types[0], from_info, to_info
    )

Looks similar to it.

_link(cache, cache_info, path_info)
# NOTE: Depending on a file system (e.g. on NTFS), `_remove` might reset
# read-only permissions in order to delete a hardlink to protected object,
# which will also reset it for the object itself, making it unprotected,
# so we need to protect it back.
cache.protect(cache_info)


def _checkout_file(
Expand All @@ -133,18 +142,33 @@ def _checkout_file(
):
"""The file is changed we need to checkout a new copy"""
modified = False

_confirm_cache_type(cache, path_info)

cache_info = cache.hash_to_path_info(change.new.oid.value)
if change.old.oid:
if relink:
if fs.iscopy(path_info) and _cache_is_copy(cache, path_info):
if fs.iscopy(path_info) and cache.cache_types[0] == "copy":
cache.unprotect(path_info)
else:
_remove(path_info, fs, change.old.in_cache, force=force)
_link(cache, cache_info, path_info)
_relink(
cache,
cache_info,
fs,
path_info,
change.old.in_cache,
force=force,
)
else:
modified = True
_remove(path_info, fs, change.old.in_cache, force=force)
_link(cache, cache_info, path_info)
_relink(
cache,
cache_info,
fs,
path_info,
change.old.in_cache,
force=force,
)
else:
_link(cache, cache_info, path_info)
modified = True
Expand Down
12 changes: 8 additions & 4 deletions dvc/objects/db/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,21 +81,25 @@ def _add_file(
from_info: "AnyPath",
to_info: "DvcPath",
_hash_info: "HashInfo",
move: bool = False,
hardlink: bool = False,
):
from dvc import fs

self.makedirs(to_info.parent)
return fs.utils.transfer(
from_fs, from_info, self.fs, to_info, move=move
from_fs,
from_info,
self.fs,
to_info,
hardlink=hardlink,
)

def add(
self,
path_info: "AnyPath",
fs: "BaseFileSystem",
hash_info: "HashInfo",
move: bool = True,
hardlink: bool = False,
verify: Optional[bool] = None,
):
if self.read_only:
Expand All @@ -110,7 +114,7 @@ def add(
pass

cache_info = self.hash_to_path_info(hash_info.value)
self._add_file(fs, path_info, cache_info, hash_info, move=move)
self._add_file(fs, path_info, cache_info, hash_info, hardlink=hardlink)

try:
if verify:
Expand Down
8 changes: 6 additions & 2 deletions dvc/objects/db/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,16 @@ def _add_file(
from_info: "AnyPath",
to_info: "DvcPath",
hash_info: "HashInfo",
move: bool = False,
hardlink: bool = False,
):
self.makedirs(to_info.parent)
if hash_info.isdir:
return super()._add_file(
from_fs, from_info, to_info, hash_info, move
from_fs,
from_info,
to_info,
hash_info,
hardlink=hardlink,
)
ref_file = ReferenceHashFile(from_info, from_fs, hash_info)
self._obj_cache[hash_info] = ref_file
Expand Down
4 changes: 2 additions & 2 deletions dvc/objects/stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def _stage_file(path_info, fs, name, odb=None, upload_odb=None, dry_run=False):
if dry_run:
obj = HashFile(path_info, fs, hash_info)
else:
odb.add(path_info, fs, hash_info, move=False)
odb.add(path_info, fs, hash_info, hardlink=False)
obj = odb.get(hash_info)

return path_info, meta, obj
Expand Down Expand Up @@ -175,7 +175,7 @@ def _stage_tree(path_info, fs, fs_info, name, odb=None, **kwargs):
path_info, fs
)
tree.digest(hash_info=hash_info)
odb.add(tree.path_info, tree.fs, tree.hash_info, move=True)
odb.add(tree.path_info, tree.fs, tree.hash_info, hardlink=False)
raw = odb.get(tree.hash_info)
# cleanup unneeded memfs tmpfile and return tree based on the
# ODB fs/path
Expand Down
8 changes: 6 additions & 2 deletions dvc/objects/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ def transfer(
obj_ids: Iterable["HashInfo"],
jobs: Optional[int] = None,
verify: bool = False,
move: bool = False,
hardlink: bool = False,
**kwargs,
) -> int:
"""Transfer (copy) the specified objects from one ODB to another.
Expand All @@ -157,7 +157,11 @@ def transfer(
def func(hash_info: "HashInfo") -> None:
obj = src.get(hash_info)
return dest.add(
obj.path_info, obj.fs, obj.hash_info, verify=verify, move=move
obj.path_info,
obj.fs,
obj.hash_info,
verify=verify,
hardlink=hardlink,
)

total = len(status.new)
Expand Down
6 changes: 3 additions & 3 deletions dvc/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,7 @@ def commit(self, filter_info=None):
self.odb,
{obj.hash_info},
shallow=False,
move=True,
hardlink=True,
)
checkout(
filter_info or self.path_info,
Expand Down Expand Up @@ -629,7 +629,7 @@ def _commit_granular_dir(self, filter_info):
self.odb,
{save_obj.hash_info} | {oid for _, _, oid in save_obj},
shallow=True,
move=True,
hardlink=True,
)
return checkout_obj

Expand Down Expand Up @@ -826,7 +826,7 @@ def transfer(
odb,
{obj.hash_info},
jobs=jobs,
move=upload,
hardlink=False,
shallow=False,
)

Expand Down
30 changes: 29 additions & 1 deletion dvc/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,42 @@
import logging
import os
import platform
import stat
import sys

logger = logging.getLogger(__name__)


if os.name == "nt" and sys.version_info < (3, 8):
# NOTE: using backports for `os.path.realpath`
# See https://bugs.python.org/issue9949 for more info.
# pylint: disable=import-error, no-name-in-module
from jaraco.windows.filesystem.backports import realpath as _realpath

def realpath(path):
return _realpath(os.fspath(path))


else:
realpath = os.path.realpath


class System:
@staticmethod
def hardlink(source, link_name):
os.link(source, link_name)
# NOTE: we should really be using `os.link()` here with
# `follow_symlinks=True`, but unfortunately the implementation is
# buggy across platforms, so until it is fixed, we just dereference
# the symlink ourselves here.
#
# See https://bugs.python.org/issue41355 for more info.
st = os.lstat(source)
if stat.S_ISLNK(st.st_mode):
src = realpath(source)
else:
src = source

os.link(src, link_name)

@staticmethod
def symlink(source, link_name):
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ install_requires =
fsspec[http]>=2021.10.1
aiohttp-retry>=2.4.5
diskcache>=5.2.1
jaraco.windows>=5.7.0; python_version < '3.8' and sys_platform == 'win32'

[options.extras_require]
# gssapi should not be included in all_remotes, because it doesn't have wheels
Expand Down Expand Up @@ -139,7 +140,6 @@ tests =
Pygments==2.10.0
collective.checkdocs==0.2
pydocstyle==6.1.1
jaraco.windows==5.7.0
# pylint requirements
pylint==2.11.1
# we use this to suppress pytest-related false positives in our tests.
Expand Down
Loading