Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ignore tests on windows #967

Merged
merged 26 commits into from
Apr 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
bf42f38
Add filter classes and tests
bimtauer Mar 24, 2022
9e925cd
Rename to ignore
bimtauer Mar 24, 2022
8f298a2
Remove old filter test
bimtauer Mar 24, 2022
a6ea4e2
Create archive function and test
bimtauer Mar 24, 2022
7c9e6cb
Refactor to use archive function
bimtauer Mar 24, 2022
d05a325
Lint
bimtauer Mar 25, 2022
226d24d
Add alternative version with ls-files
bimtauer Mar 30, 2022
17b6131
Add benchmark test
bimtauer Mar 30, 2022
c579190
Adjust tests to ignore included empty dirs
bimtauer Apr 4, 2022
149bfef
Remove old version and benchmark test
bimtauer Apr 4, 2022
2961a29
Add list file functionality
bimtauer Apr 4, 2022
046fcc6
Refactor digest as part of create archive
bimtauer Apr 4, 2022
576bb27
Remove unused code
bimtauer Apr 4, 2022
e614e51
Add test for digest
bimtauer Apr 4, 2022
6da6036
Include own digest and refactor to fast_package
bimtauer Apr 5, 2022
acf0799
Clean up rebase, integrate fast_package to package
bimtauer Apr 5, 2022
9a9091a
Include recursive check for empty dirs
bimtauer Apr 5, 2022
da379a9
Remove obsolete checksumdir, add docker-py
bimtauer Apr 5, 2022
69183ae
Dict key lookup for performance increase
bimtauer Apr 5, 2022
d031cb5
Fix requirements
bimtauer Apr 5, 2022
e239da9
Adjust click echos
bimtauer Apr 5, 2022
ebe2f2f
re-compile dependencies
bimtauer Apr 5, 2022
becdf2c
Fix lint
bimtauer Apr 5, 2022
5aa1a15
Remove obsolete import
bimtauer Apr 6, 2022
5f83195
Fix tests on windows
eapolinario Apr 21, 2022
3f112e6
Merge branch 'master' into fix-ignore-tests-on-windows
eapolinario Apr 21, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,6 @@ charset-normalizer==2.0.12
# via
# -c requirements.txt
# requests
checksumdir==1.2.0
# via
# -c requirements.txt
# flytekit
click==8.1.2
# via
# -c requirements.txt
Expand Down Expand Up @@ -102,6 +98,14 @@ docker-image-py==0.1.12
# via
# -c requirements.txt
# flytekit
docker-py==1.10.6
# via
# -c requirements.txt
# flytekit
docker-pycreds==0.4.0
# via
# -c requirements.txt
# docker-py
dockerpty==0.4.1
# via docker-compose
docopt==0.6.2
Expand Down Expand Up @@ -346,6 +350,7 @@ requests==2.27.1
# cookiecutter
# docker
# docker-compose
# docker-py
# flytekit
# google-api-core
# google-cloud-bigquery
Expand All @@ -365,6 +370,8 @@ six==1.16.0
# -c requirements.txt
# bcrypt
# cookiecutter
# docker-py
# docker-pycreds
# dockerpty
# google-auth
# grpcio
Expand Down Expand Up @@ -414,8 +421,10 @@ virtualenv==20.14.0
# via pre-commit
websocket-client==0.59.0
# via
# -c requirements.txt
# docker
# docker-compose
# docker-py
wheel==0.37.1
# via
# -c requirements.txt
Expand Down
10 changes: 8 additions & 2 deletions doc-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@ chardet==4.0.0
# via binaryornot
charset-normalizer==2.0.12
# via requests
checksumdir==1.2.0
# via flytekit
click==8.1.2
# via
# cookiecutter
Expand Down Expand Up @@ -99,6 +97,10 @@ diskcache==5.4.0
# via flytekit
docker-image-py==0.1.12
# via flytekit
docker-py==1.10.6
# via flytekit
docker-pycreds==0.4.0
# via docker-py
docstring-parser==0.13
# via flytekit
docutils==0.17.1
Expand Down Expand Up @@ -528,6 +530,7 @@ regex==2022.3.15
requests==2.27.1
# via
# cookiecutter
# docker-py
# flytekit
# google-api-core
# google-cloud-bigquery
Expand Down Expand Up @@ -571,6 +574,8 @@ six==1.16.0
# asttokens
# bleach
# cookiecutter
# docker-py
# docker-pycreds
# google-auth
# grpcio
# imagehash
Expand Down Expand Up @@ -713,6 +718,7 @@ webencodings==0.5.1
# bleach
# tinycss2
websocket-client==1.3.2
# via docker-py
# via kubernetes
wheel==0.37.1
# via flytekit
Expand Down
12 changes: 3 additions & 9 deletions flytekit/clis/sdk_in_container/serialize.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import os
import sys
import tarfile as _tarfile
import typing
from enum import Enum as _Enum

Expand All @@ -10,8 +9,7 @@
from flytekit.clis.sdk_in_container.constants import CTX_PACKAGES
from flytekit.configuration import FastSerializationSettings, ImageConfig, SerializationSettings
from flytekit.exceptions.scopes import system_entry_point
from flytekit.tools.fast_registration import compute_digest as _compute_digest
from flytekit.tools.fast_registration import filter_tar_file_fn as _filter_tar_file_fn
from flytekit.tools.fast_registration import fast_package
from flytekit.tools.repo import serialize_to_folder

CTX_IMAGE = "image"
Expand Down Expand Up @@ -165,13 +163,9 @@ def fast_workflows(ctx, folder=None):
click.echo(f"Writing output to {folder}")

source_dir = ctx.obj[CTX_LOCAL_SRC_ROOT]
digest = _compute_digest(source_dir)
folder = folder if folder else ""
archive_fname = os.path.join(folder, f"{digest}.tar.gz")
click.echo(f"Writing compressed archive to {archive_fname}")
# Write using gzip
with _tarfile.open(archive_fname, "w:gz") as tar:
tar.add(source_dir, arcname="", filter=_filter_tar_file_fn)
archive_fname = fast_package(source_dir, folder)
click.echo(f"Wrote compressed archive to {archive_fname}")

pkgs = ctx.obj[CTX_PACKAGES]
dir = ctx.obj[CTX_LOCAL_SRC_ROOT]
Expand Down
132 changes: 62 additions & 70 deletions flytekit/tools/fast_registration.py
Original file line number Diff line number Diff line change
@@ -1,48 +1,79 @@
import os as _os
from __future__ import annotations

import hashlib
import os
import posixpath
import subprocess as _subprocess
import tarfile as _tarfile
import tempfile as _tempfile
from pathlib import Path as _Path

import checksumdir
import tarfile
from typing import Optional

from flytekit.core.context_manager import FlyteContextManager
from flytekit.tools.ignore import DockerIgnore, GitIgnore, IgnoreGroup, StandardIgnore

_tmp_versions_dir = "tmp/versions"
FAST_PREFIX = "fast"
FAST_FILEENDING = ".tar.gz"

file_access = FlyteContextManager.current_context().file_access


def compute_digest(source_dir: _os.PathLike) -> str:
def fast_package(source: os.PathLike, output_dir: os.PathLike) -> os.PathLike:
"""
Walks the entirety of the source dir to compute a deterministic hex digest of the dir contents.
:param _os.PathLike source_dir:
:return Text:
Takes a source directory and packages everything not covered by common ignores into a tarball
named after a hexdigest of the included files.
:param os.PathLike source:
:param os.PathLike output_dir:
:return os.PathLike:
"""
return f"fast{checksumdir.dirhash(source_dir, 'md5', include_paths=True)}"
ignore = IgnoreGroup(source, [GitIgnore, DockerIgnore, StandardIgnore])
digest = compute_digest(source, ignore.is_ignored)
archive_fname = f"{FAST_PREFIX}{digest}{FAST_FILEENDING}"

if output_dir:
archive_fname = os.path.join(output_dir, archive_fname)

def _write_marker(marker: _os.PathLike):
try:
open(marker, "x")
except FileExistsError:
pass
with tarfile.open(archive_fname, "w:gz") as tar:
tar.add(source, arcname="", filter=ignore.tar_filter)

return archive_fname

def filter_tar_file_fn(tarinfo: _tarfile.TarInfo) -> _tarfile.TarInfo:

def compute_digest(source: os.PathLike, filter: Optional[callable] = None) -> str:
"""
Excludes designated file types from tar archive
:param _tarfile.TarInfo tarinfo:
:return _tarfile.TarInfo:
Walks the entirety of the source dir to compute a deterministic md5 hex digest of the dir contents.
:param os.PathLike source:
:param Ignore ignore:
:return Text:
"""
if tarinfo.name.endswith(".pyc"):
return None
if tarinfo.name.startswith(".cache"):
return None
if "__pycache__" in tarinfo.name:
return None
return tarinfo
hasher = hashlib.md5()
for root, _, files in os.walk(source, topdown=True):

files.sort()

for fname in files:
abspath = os.path.join(root, fname)
relpath = os.path.relpath(abspath, source)
if filter:
if filter(relpath):
continue

_filehash_update(abspath, hasher)
_pathhash_update(relpath, hasher)

return hasher.hexdigest()


def _filehash_update(path: os.PathLike, hasher: hashlib._Hash) -> None:
blocksize = 65536
with open(path, "rb") as f:
bytes = f.read(blocksize)
while bytes:
hasher.update(bytes)
bytes = f.read(blocksize)


def _pathhash_update(path: os.PathLike, hasher: hashlib._Hash) -> None:
path_list = path.split(os.sep)
hasher.update("".join(path_list).encode("utf-8"))


def get_additional_distribution_loc(remote_location: str, identifier: str) -> str:
Expand All @@ -54,59 +85,20 @@ def get_additional_distribution_loc(remote_location: str, identifier: str) -> st
return posixpath.join(remote_location, "{}.{}".format(identifier, "tar.gz"))


def upload_package(source_dir: _os.PathLike, identifier: str, remote_location: str, dry_run=False) -> str:
"""
Uploads the contents of the source dir as a tar package to a destination specified by the unique identifier and
remote_location.
:param _os.PathLike source_dir:
:param Text identifier:
:param Text remote_location:
:param bool dry_run:
:return Text:
"""
tmp_versions_dir = _os.path.join(_os.getcwd(), _tmp_versions_dir)
_os.makedirs(tmp_versions_dir, exist_ok=True)
marker = _Path(_os.path.join(tmp_versions_dir, identifier))
full_remote_path = get_additional_distribution_loc(remote_location, identifier)
if _os.path.exists(marker):
print("Local marker for identifier {} already exists, skipping upload".format(identifier))
return full_remote_path

if file_access.exists(full_remote_path):
print("Remote file {} already exists, skipping upload".format(full_remote_path))
_write_marker(marker)
return full_remote_path

with _tempfile.NamedTemporaryFile() as fp:
# Write using gzip
with _tarfile.open(fp.name, "w:gz") as tar:
tar.add(source_dir, arcname="", filter=filter_tar_file_fn)
if dry_run:
print("Would upload {} to {}".format(fp.name, full_remote_path))
else:
file_access.put_data(fp.name, full_remote_path)
print("Uploaded {} to {}".format(fp.name, full_remote_path))

# Finally, touch the marker file so we have a flag in the future to avoid re-uploading the package dir as an
# optimization
_write_marker(marker)
return full_remote_path


def download_distribution(additional_distribution: str, destination: str):
"""
Downloads a remote code distribution and overwrites any local files.
:param Text additional_distribution:
:param _os.PathLike destination:
:param os.PathLike destination:
"""
file_access.get_data(additional_distribution, destination)
tarfile_name = _os.path.basename(additional_distribution)
tarfile_name = os.path.basename(additional_distribution)
if not tarfile_name.endswith(".tar.gz"):
raise ValueError("Unrecognized additional distribution format for {}".format(additional_distribution))

# This will overwrite the existing user flyte workflow code in the current working code dir.
result = _subprocess.run(
["tar", "-xvf", _os.path.join(destination, tarfile_name), "-C", destination],
["tar", "-xvf", os.path.join(destination, tarfile_name), "-C", destination],
stdout=_subprocess.PIPE,
)
result.check_returncode()
Loading