Skip to content

Commit

Permalink
Fix ignore tests on windows (#967)
Browse files Browse the repository at this point in the history
* Add filter classes and tests

Signed-off-by: Tim Bauer <[email protected]>

* Rename to ignore

Signed-off-by: Tim Bauer <[email protected]>

* Remove old filter test

Signed-off-by: Tim Bauer <[email protected]>

* Create archive function and test

Signed-off-by: Tim Bauer <[email protected]>

* Refactor to use archive function

Signed-off-by: Tim Bauer <[email protected]>

* Lint

Signed-off-by: Tim Bauer <[email protected]>

* Add alternative version with ls-files

Signed-off-by: Tim Bauer <[email protected]>

* Add benchmark test

Signed-off-by: Tim Bauer <[email protected]>

* Adjust tests to ignore included empty dirs

Signed-off-by: Tim Bauer <[email protected]>

* Remove old version and benchmark test

Signed-off-by: Tim Bauer <[email protected]>

* Add list file functionality

Signed-off-by: Tim Bauer <[email protected]>

* Refactor digest as part of create archive

Signed-off-by: Tim Bauer <[email protected]>

* Remove unused code

Signed-off-by: Tim Bauer <[email protected]>

* Add test for digest

Signed-off-by: Tim Bauer <[email protected]>

* Include own digest and refactor to fast_package

Signed-off-by: Tim Bauer <[email protected]>

* Clean up rebase, integrate fast_package to package

Signed-off-by: Tim Bauer <[email protected]>

* Include recursive check for empty dirs

Signed-off-by: Tim Bauer <[email protected]>

* Remove obsolete checksumdir, add docker-py

Signed-off-by: Tim Bauer <[email protected]>

* Dict key lookup for performance increase

Signed-off-by: Tim Bauer <[email protected]>

* Fix requirements

Signed-off-by: Tim Bauer <[email protected]>

* Adjust click echos

Signed-off-by: Tim Bauer <[email protected]>

* re-compile dependencies

Signed-off-by: Tim Bauer <[email protected]>

* Fix lint

Signed-off-by: Tim Bauer <[email protected]>

* Remove obsolete import

Signed-off-by: Tim Bauer <[email protected]>

* Fix tests on windows

Signed-off-by: Eduardo Apolinario <[email protected]>

Co-authored-by: Tim Bauer <[email protected]>
Co-authored-by: Eduardo Apolinario <[email protected]>
  • Loading branch information
3 people authored Apr 21, 2022
1 parent 02973b7 commit 8e8d08c
Show file tree
Hide file tree
Showing 12 changed files with 546 additions and 106 deletions.
17 changes: 13 additions & 4 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,6 @@ charset-normalizer==2.0.12
# via
# -c requirements.txt
# requests
checksumdir==1.2.0
# via
# -c requirements.txt
# flytekit
click==8.1.2
# via
# -c requirements.txt
Expand Down Expand Up @@ -102,6 +98,14 @@ docker-image-py==0.1.12
# via
# -c requirements.txt
# flytekit
docker-py==1.10.6
# via
# -c requirements.txt
# flytekit
docker-pycreds==0.4.0
# via
# -c requirements.txt
# docker-py
dockerpty==0.4.1
# via docker-compose
docopt==0.6.2
Expand Down Expand Up @@ -346,6 +350,7 @@ requests==2.27.1
# cookiecutter
# docker
# docker-compose
# docker-py
# flytekit
# google-api-core
# google-cloud-bigquery
Expand All @@ -365,6 +370,8 @@ six==1.16.0
# -c requirements.txt
# bcrypt
# cookiecutter
# docker-py
# docker-pycreds
# dockerpty
# google-auth
# grpcio
Expand Down Expand Up @@ -414,8 +421,10 @@ virtualenv==20.14.0
# via pre-commit
websocket-client==0.59.0
# via
# -c requirements.txt
# docker
# docker-compose
# docker-py
wheel==0.37.1
# via
# -c requirements.txt
Expand Down
10 changes: 8 additions & 2 deletions doc-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@ chardet==4.0.0
# via binaryornot
charset-normalizer==2.0.12
# via requests
checksumdir==1.2.0
# via flytekit
click==8.1.2
# via
# cookiecutter
Expand Down Expand Up @@ -99,6 +97,10 @@ diskcache==5.4.0
# via flytekit
docker-image-py==0.1.12
# via flytekit
docker-py==1.10.6
# via flytekit
docker-pycreds==0.4.0
# via docker-py
docstring-parser==0.13
# via flytekit
docutils==0.17.1
Expand Down Expand Up @@ -528,6 +530,7 @@ regex==2022.3.15
requests==2.27.1
# via
# cookiecutter
# docker-py
# flytekit
# google-api-core
# google-cloud-bigquery
Expand Down Expand Up @@ -571,6 +574,8 @@ six==1.16.0
# asttokens
# bleach
# cookiecutter
# docker-py
# docker-pycreds
# google-auth
# grpcio
# imagehash
Expand Down Expand Up @@ -713,6 +718,7 @@ webencodings==0.5.1
# bleach
# tinycss2
websocket-client==1.3.2
# via docker-py
# via kubernetes
wheel==0.37.1
# via flytekit
Expand Down
12 changes: 3 additions & 9 deletions flytekit/clis/sdk_in_container/serialize.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import os
import sys
import tarfile as _tarfile
import typing
from enum import Enum as _Enum

Expand All @@ -10,8 +9,7 @@
from flytekit.clis.sdk_in_container.constants import CTX_PACKAGES
from flytekit.configuration import FastSerializationSettings, ImageConfig, SerializationSettings
from flytekit.exceptions.scopes import system_entry_point
from flytekit.tools.fast_registration import compute_digest as _compute_digest
from flytekit.tools.fast_registration import filter_tar_file_fn as _filter_tar_file_fn
from flytekit.tools.fast_registration import fast_package
from flytekit.tools.repo import serialize_to_folder

CTX_IMAGE = "image"
Expand Down Expand Up @@ -165,13 +163,9 @@ def fast_workflows(ctx, folder=None):
click.echo(f"Writing output to {folder}")

source_dir = ctx.obj[CTX_LOCAL_SRC_ROOT]
digest = _compute_digest(source_dir)
folder = folder if folder else ""
archive_fname = os.path.join(folder, f"{digest}.tar.gz")
click.echo(f"Writing compressed archive to {archive_fname}")
# Write using gzip
with _tarfile.open(archive_fname, "w:gz") as tar:
tar.add(source_dir, arcname="", filter=_filter_tar_file_fn)
archive_fname = fast_package(source_dir, folder)
click.echo(f"Wrote compressed archive to {archive_fname}")

pkgs = ctx.obj[CTX_PACKAGES]
dir = ctx.obj[CTX_LOCAL_SRC_ROOT]
Expand Down
132 changes: 62 additions & 70 deletions flytekit/tools/fast_registration.py
Original file line number Diff line number Diff line change
@@ -1,48 +1,79 @@
import os as _os
from __future__ import annotations

import hashlib
import os
import posixpath
import subprocess as _subprocess
import tarfile as _tarfile
import tempfile as _tempfile
from pathlib import Path as _Path

import checksumdir
import tarfile
from typing import Optional

from flytekit.core.context_manager import FlyteContextManager
from flytekit.tools.ignore import DockerIgnore, GitIgnore, IgnoreGroup, StandardIgnore

_tmp_versions_dir = "tmp/versions"
FAST_PREFIX = "fast"
FAST_FILEENDING = ".tar.gz"

file_access = FlyteContextManager.current_context().file_access


def compute_digest(source_dir: _os.PathLike) -> str:
def fast_package(source: os.PathLike, output_dir: os.PathLike) -> os.PathLike:
"""
Walks the entirety of the source dir to compute a deterministic hex digest of the dir contents.
:param _os.PathLike source_dir:
:return Text:
Takes a source directory and packages everything not covered by common ignores into a tarball
named after a hexdigest of the included files.
:param os.PathLike source:
:param os.PathLike output_dir:
:return os.PathLike:
"""
return f"fast{checksumdir.dirhash(source_dir, 'md5', include_paths=True)}"
ignore = IgnoreGroup(source, [GitIgnore, DockerIgnore, StandardIgnore])
digest = compute_digest(source, ignore.is_ignored)
archive_fname = f"{FAST_PREFIX}{digest}{FAST_FILEENDING}"

if output_dir:
archive_fname = os.path.join(output_dir, archive_fname)

def _write_marker(marker: _os.PathLike):
try:
open(marker, "x")
except FileExistsError:
pass
with tarfile.open(archive_fname, "w:gz") as tar:
tar.add(source, arcname="", filter=ignore.tar_filter)

return archive_fname

def filter_tar_file_fn(tarinfo: _tarfile.TarInfo) -> _tarfile.TarInfo:

def compute_digest(source: os.PathLike, filter: Optional[callable] = None) -> str:
"""
Excludes designated file types from tar archive
:param _tarfile.TarInfo tarinfo:
:return _tarfile.TarInfo:
Walks the entirety of the source dir to compute a deterministic md5 hex digest of the dir contents.
:param os.PathLike source:
:param Ignore ignore:
:return Text:
"""
if tarinfo.name.endswith(".pyc"):
return None
if tarinfo.name.startswith(".cache"):
return None
if "__pycache__" in tarinfo.name:
return None
return tarinfo
hasher = hashlib.md5()
for root, _, files in os.walk(source, topdown=True):

files.sort()

for fname in files:
abspath = os.path.join(root, fname)
relpath = os.path.relpath(abspath, source)
if filter:
if filter(relpath):
continue

_filehash_update(abspath, hasher)
_pathhash_update(relpath, hasher)

return hasher.hexdigest()


def _filehash_update(path: os.PathLike, hasher: hashlib._Hash) -> None:
blocksize = 65536
with open(path, "rb") as f:
bytes = f.read(blocksize)
while bytes:
hasher.update(bytes)
bytes = f.read(blocksize)


def _pathhash_update(path: os.PathLike, hasher: hashlib._Hash) -> None:
path_list = path.split(os.sep)
hasher.update("".join(path_list).encode("utf-8"))


def get_additional_distribution_loc(remote_location: str, identifier: str) -> str:
Expand All @@ -54,59 +85,20 @@ def get_additional_distribution_loc(remote_location: str, identifier: str) -> st
return posixpath.join(remote_location, "{}.{}".format(identifier, "tar.gz"))


def upload_package(source_dir: _os.PathLike, identifier: str, remote_location: str, dry_run=False) -> str:
"""
Uploads the contents of the source dir as a tar package to a destination specified by the unique identifier and
remote_location.
:param _os.PathLike source_dir:
:param Text identifier:
:param Text remote_location:
:param bool dry_run:
:return Text:
"""
tmp_versions_dir = _os.path.join(_os.getcwd(), _tmp_versions_dir)
_os.makedirs(tmp_versions_dir, exist_ok=True)
marker = _Path(_os.path.join(tmp_versions_dir, identifier))
full_remote_path = get_additional_distribution_loc(remote_location, identifier)
if _os.path.exists(marker):
print("Local marker for identifier {} already exists, skipping upload".format(identifier))
return full_remote_path

if file_access.exists(full_remote_path):
print("Remote file {} already exists, skipping upload".format(full_remote_path))
_write_marker(marker)
return full_remote_path

with _tempfile.NamedTemporaryFile() as fp:
# Write using gzip
with _tarfile.open(fp.name, "w:gz") as tar:
tar.add(source_dir, arcname="", filter=filter_tar_file_fn)
if dry_run:
print("Would upload {} to {}".format(fp.name, full_remote_path))
else:
file_access.put_data(fp.name, full_remote_path)
print("Uploaded {} to {}".format(fp.name, full_remote_path))

# Finally, touch the marker file so we have a flag in the future to avoid re-uploading the package dir as an
# optimization
_write_marker(marker)
return full_remote_path


def download_distribution(additional_distribution: str, destination: str):
"""
Downloads a remote code distribution and overwrites any local files.
:param Text additional_distribution:
:param _os.PathLike destination:
:param os.PathLike destination:
"""
file_access.get_data(additional_distribution, destination)
tarfile_name = _os.path.basename(additional_distribution)
tarfile_name = os.path.basename(additional_distribution)
if not tarfile_name.endswith(".tar.gz"):
raise ValueError("Unrecognized additional distribution format for {}".format(additional_distribution))

# This will overwrite the existing user flyte workflow code in the current working code dir.
result = _subprocess.run(
["tar", "-xvf", _os.path.join(destination, tarfile_name), "-C", destination],
["tar", "-xvf", os.path.join(destination, tarfile_name), "-C", destination],
stdout=_subprocess.PIPE,
)
result.check_returncode()
Loading

0 comments on commit 8e8d08c

Please sign in to comment.