diff --git a/src/macaron/repo_finder/repo_finder.py b/src/macaron/repo_finder/repo_finder.py index d9b4df1e5..29b114a11 100644 --- a/src/macaron/repo_finder/repo_finder.py +++ b/src/macaron/repo_finder/repo_finder.py @@ -36,28 +36,48 @@ import os from urllib.parse import ParseResult, urlunparse +from git import InvalidGitRepositoryError from packageurl import PackageURL +from pydriller import Git from macaron.config.defaults import defaults from macaron.config.global_config import global_config +from macaron.errors import CloneError, RepoCheckOutError from macaron.repo_finder import to_domain_from_known_purl_types -from macaron.repo_finder.commit_finder import match_tags +from macaron.repo_finder.commit_finder import find_commit, match_tags from macaron.repo_finder.repo_finder_base import BaseRepoFinder from macaron.repo_finder.repo_finder_deps_dev import DepsDevRepoFinder from macaron.repo_finder.repo_finder_java import JavaRepoFinder -from macaron.repo_finder.repo_utils import generate_report, prepare_repo -from macaron.slsa_analyzer.git_url import GIT_REPOS_DIR, list_remote_references +from macaron.repo_finder.repo_utils import ( + check_repo_urls_are_equivalent, + generate_report, + get_git_service, + get_local_repos_path, +) +from macaron.slsa_analyzer.git_url import ( + GIT_REPOS_DIR, + check_out_repo_target, + get_remote_origin_of_local_repo, + get_remote_vcs_url, + get_repo_dir_name, + is_empty_repo, + is_remote_repo, + list_remote_references, + resolve_local_path, +) logger: logging.Logger = logging.getLogger(__name__) -def find_repo(purl: PackageURL) -> str: +def find_repo(purl: PackageURL, check_latest_version: bool = True) -> str: """Retrieve the repository URL that matches the given PURL. Parameters ---------- purl : PackageURL The parsed PURL to convert to the repository path. + check_latest_version: bool + A flag that determines whether the latest version of the PURL is also checked. Returns ------- @@ -80,7 +100,22 @@ def find_repo(purl: PackageURL) -> str: # Call Repo Finder and return first valid URL logger.debug("Analyzing %s with Repo Finder: %s", purl, type(repo_finder)) - return repo_finder.find_repo(purl) + found_repo = repo_finder.find_repo(purl) + + if found_repo or not check_latest_version: + return found_repo + + # Try to find the latest version repo. + logger.error("Could not find repo for PURL: %s", purl) + latest_version_purl = get_latest_purl_if_different(purl) + if not latest_version_purl: + logger.debug("Could not find newer PURL than provided: %s", purl) + return "" + + found_repo = DepsDevRepoFinder().find_repo(latest_version_purl) + if not found_repo: + logger.debug("Could not find repo from latest version of PURL: %s", latest_version_purl) + return found_repo def to_repo_path(purl: PackageURL, available_domains: list[str]) -> str | None: @@ -113,8 +148,7 @@ def to_repo_path(purl: PackageURL, available_domains: list[str]) -> str | None: domain = to_domain_from_known_purl_types(purl.type) or (purl.type if purl.type in available_domains else None) if not domain: logger.info("The PURL type of %s is not valid as a repository type. Trying to find the repository...", purl) - # Try to find the repository - return find_repo(purl) + return None if not purl.namespace: logger.error("Expecting a non-empty namespace from %s.", purl) @@ -133,7 +167,7 @@ def to_repo_path(purl: PackageURL, available_domains: list[str]) -> str | None: ) -def find_source(purl_string: str, input_repo: str | None) -> bool: +def find_source(purl_string: str, input_repo: str | None, latest_version_fallback: bool = True) -> bool: """Perform repo and commit finding for a passed PURL, or commit finding for a passed PURL and repo. Parameters @@ -142,6 +176,8 @@ def find_source(purl_string: str, input_repo: str | None) -> bool: The PURL string of the target. input_repo: str | None The repository path optionally provided by the user. + latest_version_fallback: bool + A flag that determines whether the latest version of the same artifact can be checked as a fallback option. Returns ------- @@ -149,17 +185,25 @@ def find_source(purl_string: str, input_repo: str | None) -> bool: True if the source was found. """ try: - purl = PackageURL.from_string(purl_string) + purl: PackageURL | None = PackageURL.from_string(purl_string) except ValueError as error: - logger.error("Could not parse PURL: %s", error) + logger.error("Could not parse PURL: '%s'. Error: %s", purl_string, error) return False - if not purl.version: - logger.debug("PURL is missing version.") + if not purl: + # Unreachable. return False + checked_latest_purl = False + if not purl.version: + purl = get_latest_purl_if_different(purl) + if not purl or not purl.version: + logger.error("PURL is missing version.") + return False + checked_latest_purl = True + found_repo = input_repo - if not input_repo: + if not found_repo: logger.debug("Searching for repo of PURL: %s", purl) found_repo = find_repo(purl) @@ -170,43 +214,47 @@ def find_source(purl_string: str, input_repo: str | None) -> bool: # Disable other loggers for cleaner output. logging.getLogger("macaron.slsa_analyzer.analyzer").disabled = True + digest = None if defaults.getboolean("repofinder", "find_source_should_clone"): + # Clone the repo to retrieve the tags. logger.debug("Preparing repo: %s", found_repo) repo_dir = os.path.join(global_config.output_path, GIT_REPOS_DIR) logging.getLogger("macaron.slsa_analyzer.git_url").disabled = True - git_obj = prepare_repo( - repo_dir, - found_repo, - purl=purl, - ) + # The prepare_repo function will also check the latest version of the artifact if required. + git_obj = prepare_repo(repo_dir, found_repo, purl=purl, latest_version_fallback=not checked_latest_purl) - if not git_obj: - # TODO expand this message to cover cases where the obj was not created due to lack of correct tag. - logger.error("Could not resolve repository: %s", found_repo) - return False + if git_obj: + try: + digest = git_obj.get_head().hash + except ValueError: + logger.debug("Could not retrieve commit hash from repository.") - try: - digest = git_obj.get_head().hash - except ValueError: - logger.debug("Could not retrieve commit hash from repository.") + if not digest: return False else: - # Retrieve the tags. + # Retrieve the tags using a remote git operation. tags = get_tags_via_git_remote(found_repo) - if not tags: - return False + if tags: + matches = match_tags(list(tags.keys()), purl.name, purl.version) + if matches: + matched_tag = matches[0] + digest = tags[matched_tag] - matches = match_tags(list(tags.keys()), purl.name, purl.version) + if not digest: + logger.error("Could not find commit for purl / repository: %s / %s", purl, found_repo) + if not latest_version_fallback or checked_latest_purl: + return False - if not matches: - return False + # When not cloning the latest version must be checked here. + latest_version_purl = get_latest_purl_if_different(purl) + if not latest_version_purl: + return False - matched_tag = matches[0] - digest = tags[matched_tag] + latest_repo = get_latest_repo_if_different(latest_version_purl, found_repo) + if not latest_repo: + return False - if not digest: - logger.error("Could not find commit for purl / repository: %s / %s", purl, found_repo) - return False + return find_source(str(purl), latest_repo, False) if not input_repo: logger.info("Found repository for PURL: %s", found_repo) @@ -219,6 +267,68 @@ def find_source(purl_string: str, input_repo: str | None) -> bool: return True +def get_latest_purl_if_different(purl: PackageURL) -> PackageURL | None: + """Return the latest version of an artifact represented by a PURL, if it is different. + + Parameters + ---------- + purl : PackageURL | None + The PURL of the analysis target. + + Returns + ------- + PackageURL | None + The latest PURL, or None if they are the same or an error occurs. + """ + if purl.version: + namespace = purl.namespace + "/" if purl.namespace else "" + no_version_purl = PackageURL.from_string(f"pkg:{purl.type}/{namespace}{purl.name}") + else: + no_version_purl = purl + + latest_version_purl = DepsDevRepoFinder.get_latest_version(no_version_purl) + if not latest_version_purl: + logger.error("Latest version PURL could not be found.") + return None + + if latest_version_purl == purl: + logger.error("Latest version PURL is the same as the current.") + return None + + logger.debug("Found new version of PURL: %s", latest_version_purl) + return latest_version_purl + + +def get_latest_repo_if_different(latest_version_purl: PackageURL, original_repo: str) -> str: + """Return the repository of the passed PURL if it is different to the passed repository. + + Parameters + ---------- + latest_version_purl: PackageURL + The PURL to use. + original_repo: str + The repository to compare against. + + Returns + ------- + str + The latest repository, or an empty string if not found. + """ + latest_repo = find_repo(latest_version_purl, False) + if not latest_repo: + logger.error("Could not find repository from latest PURL: %s", latest_version_purl) + return "" + + if check_repo_urls_are_equivalent(original_repo, latest_repo): + logger.error( + "Repository from latest PURL is equivalent to original repository: %s ~= %s", latest_repo, original_repo + ) + return "" + + logger.debug("Found new repository from latest PURL: %s", latest_repo) + return latest_repo + + def get_tags_via_git_remote(repo: str) -> dict[str, str] | None: """Retrieve all tags from a given repository using ls-remote. @@ -260,3 +370,135 @@ def get_tags_via_git_remote(repo: str) -> dict[str, str] | None: logger.debug("Found %s tags via ls-remote of %s", len(tags), repo) return tags + + +def prepare_repo( + target_dir: str, + repo_path: str, + branch_name: str = "", + digest: str = "", + purl: PackageURL | None = None, + latest_version_fallback: bool = True, +) -> Git | None: + """Prepare the target repository for analysis. + + If ``repo_path`` is a remote path, the target repo is cloned to ``{target_dir}/{unique_path}``. + The ``unique_path`` of a repository will depend on its remote url. + For example, if given the ``repo_path`` https://github.com/org/name.git, it will + be cloned to ``{target_dir}/github.com/org/name``. + + If ``repo_path`` is a local path, this method will check if ``repo_path`` resolves to a directory inside + ``local_repos_path`` and to a valid git repository. + + Parameters + ---------- + target_dir : str + The directory where all remote repository will be cloned. + repo_path : str + The path to the repository, can be either local or remote. + branch_name : str + The name of the branch we want to checkout. + digest : str + The hash of the commit that we want to checkout in the branch. + purl : PackageURL | None + The PURL of the analysis target. + latest_version_fallback: bool + A flag that determines whether the latest version of the same artifact can be checked as a fallback option. + + Returns + ------- + Git | None + The pydriller.Git object of the repository or None if error. + """ + # TODO: separate the logic for handling remote and local repos instead of putting them into this method. + logger.info( + "Preparing the repository for the analysis (path=%s, branch=%s, digest=%s)", + repo_path, + branch_name, + digest, + ) + + resolved_local_path = "" + is_remote = is_remote_repo(repo_path) + + if is_remote: + logger.info("The path to repo %s is a remote path.", repo_path) + resolved_remote_path = get_remote_vcs_url(repo_path) + if not resolved_remote_path: + logger.error("The provided path to repo %s is not a valid remote path.", repo_path) + return None + + git_service = get_git_service(resolved_remote_path) + repo_unique_path = get_repo_dir_name(resolved_remote_path) + resolved_local_path = os.path.join(target_dir, repo_unique_path) + logger.info("Cloning the repository.") + try: + git_service.clone_repo(resolved_local_path, resolved_remote_path) + except CloneError as error: + logger.error("Cannot clone %s: %s", resolved_remote_path, str(error)) + return None + else: + logger.info("Checking if the path to repo %s is a local path.", repo_path) + resolved_local_path = resolve_local_path(get_local_repos_path(), repo_path) + + if resolved_local_path: + try: + git_obj = Git(resolved_local_path) + except InvalidGitRepositoryError: + logger.error("No git repo exists at %s.", resolved_local_path) + return None + else: + logger.error("Error happened while preparing the repo.") + return None + + if is_empty_repo(git_obj): + logger.error("The target repository does not have any commit.") + return None + + # Find the digest and branch if a version has been specified + if not digest and purl and purl.version: + found_digest = find_commit(git_obj, purl) + if not found_digest: + logger.error("Could not map the input purl string to a specific commit in the corresponding repository.") + if not latest_version_fallback: + return None + # If the commit could not be found, check if the latest version of the artifact has a different repository. + latest_purl = get_latest_purl_if_different(purl) + if not latest_purl: + return None + latest_repo = get_latest_repo_if_different(latest_purl, repo_path) + if not latest_repo: + return None + return prepare_repo(latest_repo, latest_repo, target_dir, latest_version_fallback=False) + + digest = found_digest + + # Checking out the specific branch or commit. This operation varies depends on the git service that the + # repository uses. + if not is_remote: + # If the repo path provided by the user is a local path, we need to get the actual origin remote URL of + # the repo to decide on the suitable git service. + origin_remote_url = get_remote_origin_of_local_repo(git_obj) + if is_remote_repo(origin_remote_url): + # The local repo's origin remote url is a remote URL (e.g https://host.com/a/b): In this case, we obtain + # the corresponding git service using ``self.get_git_service``. + git_service = get_git_service(origin_remote_url) + else: + # The local repo's origin remote url is a local path (e.g /path/to/local/...). This happens when the + # target repository is a clone from another local repo or is a clone from a git archive - + # https://git-scm.com/docs/git-archive: In this case, we fall-back to the generic function + # ``git_url.check_out_repo_target``. + if not check_out_repo_target(git_obj, branch_name, digest, not is_remote): + logger.error("Cannot checkout the specific branch or commit of the target repo.") + return None + + return git_obj + + try: + git_service.check_out_repo(git_obj, branch_name, digest, not is_remote) + except RepoCheckOutError as error: + logger.error("Failed to check out repository at %s", resolved_local_path) + logger.error(error) + return None + + return git_obj diff --git a/src/macaron/repo_finder/repo_finder_deps_dev.py b/src/macaron/repo_finder/repo_finder_deps_dev.py index 4696caa27..d66aaaebf 100644 --- a/src/macaron/repo_finder/repo_finder_deps_dev.py +++ b/src/macaron/repo_finder/repo_finder_deps_dev.py @@ -36,6 +36,9 @@ class DepsDevType(StrEnum): class DepsDevRepoFinder(BaseRepoFinder): """This class is used to find repositories using Google's Open Source Insights A.K.A. deps.dev.""" + # See https://docs.deps.dev/api/v3alpha/ + BASE_URL = "https://api.deps.dev/v3alpha/purl/" + def find_repo(self, purl: PackageURL) -> str: """ Attempt to retrieve a repository URL that matches the passed artifact. @@ -108,53 +111,70 @@ def get_project_info(project_url: str) -> dict[str, Any] | None: return response_json - def _create_urls(self, purl: PackageURL) -> list[str]: - """ - Create the urls to search for the metadata relating to the passed artifact. - - If a version is not specified, remote API calls will be used to try and find one. + @staticmethod + def get_latest_version(purl: PackageURL) -> PackageURL | None: + """Return a PURL representing the latest version of the passed artifact. Parameters ---------- purl : PackageURL - The PURL of an artifact. + The current PURL. Returns ------- - list[str] - The list of created URLs. + PackageURL | None + The latest version of the PURL, or None if it could not be found. """ - # See https://docs.deps.dev/api/v3alpha/ - base_url = f"https://api.deps.dev/v3alpha/purl/{encode(str(purl), safe='')}" - - if not base_url: - return [] - if purl.version: - return [base_url] + namespace = purl.namespace + "/" if purl.namespace else "" + purl = PackageURL.from_string(f"pkg:{purl.type}/{namespace}{purl.name}") - # Find the latest version. - response = send_get_http_raw(base_url, {}) + url = f"{DepsDevRepoFinder.BASE_URL}{encode(str(purl), safe='')}" + response = send_get_http_raw(url) if not response: - return [] + return None try: metadata: dict = json.loads(response.text) except ValueError as error: logger.debug("Failed to parse response from deps.dev: %s", error) - return [] + return None versions_keys = ["package", "versions"] if "package" in metadata else ["version"] versions = json_extract(metadata, versions_keys, list) if not versions: - return [] + return None latest_version = json_extract(versions[-1], ["versionKey", "version"], str) if not latest_version: - return [] + return None + + namespace = purl.namespace + "/" if purl.namespace else "" + return PackageURL.from_string(f"pkg:{purl.type}/{namespace}{purl.name}@{latest_version}") + + def _create_urls(self, purl: PackageURL) -> list[str]: + """ + Create the urls to search for the metadata relating to the passed artifact. + + If a version is not specified, remote API calls will be used to try and find one. + + Parameters + ---------- + purl : PackageURL + The PURL of an artifact. + + Returns + ------- + list[str] + The list of created URLs. + """ + if not purl.version: + latest_purl = DepsDevRepoFinder.get_latest_version(purl) + if not latest_purl: + return [] + purl = latest_purl - logger.debug("Found latest version: %s", latest_version) - return [f"{base_url}%40{latest_version}"] + return [f"{DepsDevRepoFinder.BASE_URL}{encode(str(purl), safe='')}"] def _retrieve_json(self, url: str) -> str: """ diff --git a/src/macaron/repo_finder/repo_finder_java.py b/src/macaron/repo_finder/repo_finder_java.py index 77e1705f8..e6f349d3b 100644 --- a/src/macaron/repo_finder/repo_finder_java.py +++ b/src/macaron/repo_finder/repo_finder_java.py @@ -11,6 +11,7 @@ from macaron.config.defaults import defaults from macaron.parsers.pomparser import parse_pom_string from macaron.repo_finder.repo_finder_base import BaseRepoFinder +from macaron.repo_finder.repo_finder_deps_dev import DepsDevRepoFinder from macaron.repo_finder.repo_validator import find_valid_repository_url from macaron.util import send_get_http_raw @@ -51,8 +52,13 @@ def find_repo(self, purl: PackageURL) -> str: if not version: logger.info("Version missing for maven artifact: %s:%s", group, artifact) - # TODO add support for Java artifacts without a version - return "" + latest_purl = DepsDevRepoFinder().get_latest_version(purl) + if not latest_purl or not latest_purl.version: + logger.debug("Could not find version for artifact: %s:%s", purl.namespace, purl.name) + return "" + group = latest_purl.namespace or "" + artifact = latest_purl.name + version = latest_purl.version while group and artifact and version and limit > 0: # Create the URLs for retrieving the artifact's POM diff --git a/src/macaron/repo_finder/repo_utils.py b/src/macaron/repo_finder/repo_utils.py index c3dffc8c5..467776673 100644 --- a/src/macaron/repo_finder/repo_utils.py +++ b/src/macaron/repo_finder/repo_utils.py @@ -8,25 +8,12 @@ import string from urllib.parse import urlparse -from git import InvalidGitRepositoryError from packageurl import PackageURL -from pydriller import Git from macaron.config.global_config import global_config -from macaron.errors import CloneError, RepoCheckOutError -from macaron.repo_finder.commit_finder import find_commit from macaron.slsa_analyzer.git_service import GIT_SERVICES, BaseGitService from macaron.slsa_analyzer.git_service.base_git_service import NoneGitService -from macaron.slsa_analyzer.git_url import ( - GIT_REPOS_DIR, - check_out_repo_target, - get_remote_origin_of_local_repo, - get_remote_vcs_url, - get_repo_dir_name, - is_empty_repo, - is_remote_repo, - resolve_local_path, -) +from macaron.slsa_analyzer.git_url import GIT_REPOS_DIR logger: logging.Logger = logging.getLogger(__name__) @@ -125,125 +112,6 @@ def create_report(purl: str, commit: str, repo: str) -> str: return json.dumps(data, indent=4) -def prepare_repo( - target_dir: str, - repo_path: str, - branch_name: str = "", - digest: str = "", - purl: PackageURL | None = None, -) -> Git | None: - """Prepare the target repository for analysis. - - If ``repo_path`` is a remote path, the target repo is cloned to ``{target_dir}/{unique_path}``. - The ``unique_path`` of a repository will depend on its remote url. - For example, if given the ``repo_path`` https://github.com/org/name.git, it will - be cloned to ``{target_dir}/github.com/org/name``. - - If ``repo_path`` is a local path, this method will check if ``repo_path`` resolves to a directory inside - ``local_repos_path`` and to a valid git repository. - - Parameters - ---------- - target_dir : str - The directory where all remote repository will be cloned. - repo_path : str - The path to the repository, can be either local or remote. - branch_name : str - The name of the branch we want to checkout. - digest : str - The hash of the commit that we want to checkout in the branch. - purl : PackageURL | None - The PURL of the analysis target. - - Returns - ------- - Git | None - The pydriller.Git object of the repository or None if error. - """ - # TODO: separate the logic for handling remote and local repos instead of putting them into this method. - logger.info( - "Preparing the repository for the analysis (path=%s, branch=%s, digest=%s)", - repo_path, - branch_name, - digest, - ) - - resolved_local_path = "" - is_remote = is_remote_repo(repo_path) - - if is_remote: - logger.info("The path to repo %s is a remote path.", repo_path) - resolved_remote_path = get_remote_vcs_url(repo_path) - if not resolved_remote_path: - logger.error("The provided path to repo %s is not a valid remote path.", repo_path) - return None - - git_service = get_git_service(resolved_remote_path) - repo_unique_path = get_repo_dir_name(resolved_remote_path) - resolved_local_path = os.path.join(target_dir, repo_unique_path) - logger.info("Cloning the repository.") - try: - git_service.clone_repo(resolved_local_path, resolved_remote_path) - except CloneError as error: - logger.error("Cannot clone %s: %s", resolved_remote_path, str(error)) - return None - else: - logger.info("Checking if the path to repo %s is a local path.", repo_path) - resolved_local_path = resolve_local_path(get_local_repos_path(), repo_path) - - if resolved_local_path: - try: - git_obj = Git(resolved_local_path) - except InvalidGitRepositoryError: - logger.error("No git repo exists at %s.", resolved_local_path) - return None - else: - logger.error("Error happened while preparing the repo.") - return None - - if is_empty_repo(git_obj): - logger.error("The target repository does not have any commit.") - return None - - # Find the digest and branch if a version has been specified - if not digest and purl and purl.version: - found_digest = find_commit(git_obj, purl) - if not found_digest: - logger.error("Could not map the input purl string to a specific commit in the corresponding repository.") - return None - digest = found_digest - - # Checking out the specific branch or commit. This operation varies depends on the git service that the - # repository uses. - if not is_remote: - # If the repo path provided by the user is a local path, we need to get the actual origin remote URL of - # the repo to decide on the suitable git service. - origin_remote_url = get_remote_origin_of_local_repo(git_obj) - if is_remote_repo(origin_remote_url): - # The local repo's origin remote url is a remote URL (e.g https://host.com/a/b): In this case, we obtain - # the corresponding git service using ``self.get_git_service``. - git_service = get_git_service(origin_remote_url) - else: - # The local repo's origin remote url is a local path (e.g /path/to/local/...). This happens when the - # target repository is a clone from another local repo or is a clone from a git archive - - # https://git-scm.com/docs/git-archive: In this case, we fall-back to the generic function - # ``git_url.check_out_repo_target``. - if not check_out_repo_target(git_obj, branch_name, digest, not is_remote): - logger.error("Cannot checkout the specific branch or commit of the target repo.") - return None - - return git_obj - - try: - git_service.check_out_repo(git_obj, branch_name, digest, not is_remote) - except RepoCheckOutError as error: - logger.error("Failed to check out repository at %s", resolved_local_path) - logger.error(error) - return None - - return git_obj - - def get_local_repos_path() -> str: """Get the local repos path from global config or use default. @@ -278,3 +146,26 @@ def get_git_service(remote_path: str | None) -> BaseGitService: return git_service return NoneGitService() + + +def check_repo_urls_are_equivalent(repo_1: str, repo_2: str) -> bool: + """Check if the two passed repo URLs are equivalent. + + Parameters + ---------- + repo_1: str + The first repository URL as a string. + repo_2: str + The second repository URL as a string. + + Returns + ------- + bool + True if the repository URLs have equal hostnames and paths, otherwise False. + """ + repo_url_1 = urlparse(repo_1) + repo_url_2 = urlparse(repo_2) + if repo_url_1.hostname != repo_url_2.hostname or repo_url_1.path != repo_url_2.path: + return False + + return True diff --git a/src/macaron/slsa_analyzer/analyzer.py b/src/macaron/slsa_analyzer/analyzer.py index a5fd67f22..e95c29a5a 100644 --- a/src/macaron/slsa_analyzer/analyzer.py +++ b/src/macaron/slsa_analyzer/analyzer.py @@ -43,7 +43,8 @@ extract_repo_and_commit_from_provenance, ) from macaron.repo_finder.provenance_finder import ProvenanceFinder, find_provenance_from_ci -from macaron.repo_finder.repo_utils import get_git_service, prepare_repo +from macaron.repo_finder.repo_finder import prepare_repo +from macaron.repo_finder.repo_utils import get_git_service from macaron.repo_verifier.repo_verifier import verify_repo from macaron.slsa_analyzer import git_url from macaron.slsa_analyzer.analyze_context import AnalyzeContext diff --git a/tests/integration/cases/latest_repo_comparison/check_output.sh b/tests/integration/cases/latest_repo_comparison/check_output.sh new file mode 100755 index 000000000..c8e9cbf2e --- /dev/null +++ b/tests/integration/cases/latest_repo_comparison/check_output.sh @@ -0,0 +1,6 @@ +#!/bin/bash +# Copyright (c) 2024 - 2024, Oracle and/or its affiliates. All rights reserved. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. + +[[ "$(jq -r '.commit' output/reports/maven/io_avaje/avaje-prisms/avaje-prisms.source.json)" = "1f6f953df0b58f0c35b5e136f62f63ba7a22bc03" ]] && +[[ "$(jq -r '.repo' output/reports/maven/io_avaje/avaje-prisms/avaje-prisms.source.json)" = "https://github.com/avaje/avaje-prisms" ]] diff --git a/tests/integration/cases/latest_repo_comparison/test.yaml b/tests/integration/cases/latest_repo_comparison/test.yaml new file mode 100644 index 000000000..3731b88c8 --- /dev/null +++ b/tests/integration/cases/latest_repo_comparison/test.yaml @@ -0,0 +1,36 @@ +# Copyright (c) 2024 - 2024, Oracle and/or its affiliates. All rights reserved. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. + +description: | + Check that the find-source and analyze commands behave the same for a given artifact. + +tags: +- macaron-python-package +- macaron-docker-image + +steps: +- name: Run macaron find source + kind: find-source + options: + command_args: + - -purl + - pkg:maven/io.avaje/avaje-prisms@1.1 +- name: Check that the repository was not cloned + kind: shell + options: + cmd: ls output/git_repos/github.com/avaje/avaje-prisms/ + expect_fail: true +- name: Check the report contents + kind: shell + options: + cmd: ./check_output.sh +- name: Run macaron analyze + kind: analyze + options: + command_args: + - -purl + - pkg:maven/io.avaje/avaje-prisms@1.1 +- name: Check that correct repository was cloned + kind: shell + options: + cmd: ls output/git_repos/github.com/avaje/avaje-prisms/ diff --git a/tests/integration/cases/repo_finder_remote_calls/repo_finder.py b/tests/integration/cases/repo_finder_remote_calls/repo_finder.py index 12f10cac1..f529cb771 100644 --- a/tests/integration/cases/repo_finder_remote_calls/repo_finder.py +++ b/tests/integration/cases/repo_finder_remote_calls/repo_finder.py @@ -12,6 +12,7 @@ from macaron.config.defaults import defaults from macaron.repo_finder import repo_validator from macaron.repo_finder.repo_finder import find_repo +from macaron.repo_finder.repo_finder_deps_dev import DepsDevRepoFinder from macaron.slsa_analyzer.git_url import clean_url logger: logging.Logger = logging.getLogger(__name__) @@ -70,6 +71,21 @@ def test_repo_finder() -> int: if not parsed_url or not repo_validator.resolve_redirects(parsed_url): return os.EX_UNAVAILABLE + # Test Java package whose SCM metadata only points to the repo in later versions than is provided here. + purl = PackageURL.from_string("pkg:maven/io.vertx/vertx-auth-common@3.8.0") + repo = find_repo(purl) + if repo == "https://github.com/eclipse-vertx/vertx-auth": + return os.EX_UNAVAILABLE + latest_purl = DepsDevRepoFinder().get_latest_version(purl) + assert latest_purl + repo = find_repo(latest_purl) + if repo != "https://github.com/eclipse-vertx/vertx-auth": + return os.EX_UNAVAILABLE + + # Test Java package that has no version. + if not find_repo(PackageURL.from_string("pkg:maven/io.vertx/vertx-auth-common")): + return os.EX_UNAVAILABLE + return os.EX_OK