diff --git a/docs/source/pages/developers_guide/apidoc/macaron.database.rst b/docs/source/pages/developers_guide/apidoc/macaron.database.rst
index 9529c9a60..800ca7c41 100644
--- a/docs/source/pages/developers_guide/apidoc/macaron.database.rst
+++ b/docs/source/pages/developers_guide/apidoc/macaron.database.rst
@@ -17,10 +17,10 @@ macaron.database.database\_manager module
:undoc-members:
:show-inheritance:
-macaron.database.rfc3339\_datetime module
+macaron.database.db\_custom\_types module
-----------------------------------------
-.. automodule:: macaron.database.rfc3339_datetime
+.. automodule:: macaron.database.db_custom_types
:members:
:undoc-members:
:show-inheritance:
diff --git a/docs/source/pages/developers_guide/apidoc/macaron.malware_analyzer.pypi_heuristics.metadata.rst b/docs/source/pages/developers_guide/apidoc/macaron.malware_analyzer.pypi_heuristics.metadata.rst
new file mode 100644
index 000000000..1d6e6033a
--- /dev/null
+++ b/docs/source/pages/developers_guide/apidoc/macaron.malware_analyzer.pypi_heuristics.metadata.rst
@@ -0,0 +1,58 @@
+macaron.malware\_analyzer.pypi\_heuristics.metadata package
+===========================================================
+
+.. automodule:: macaron.malware_analyzer.pypi_heuristics.metadata
+ :members:
+ :undoc-members:
+ :show-inheritance:
+
+Submodules
+----------
+
+macaron.malware\_analyzer.pypi\_heuristics.metadata.closer\_release\_join\_date module
+--------------------------------------------------------------------------------------
+
+.. automodule:: macaron.malware_analyzer.pypi_heuristics.metadata.closer_release_join_date
+ :members:
+ :undoc-members:
+ :show-inheritance:
+
+macaron.malware\_analyzer.pypi\_heuristics.metadata.empty\_project\_link module
+-------------------------------------------------------------------------------
+
+.. automodule:: macaron.malware_analyzer.pypi_heuristics.metadata.empty_project_link
+ :members:
+ :undoc-members:
+ :show-inheritance:
+
+macaron.malware\_analyzer.pypi\_heuristics.metadata.high\_release\_frequency module
+-----------------------------------------------------------------------------------
+
+.. automodule:: macaron.malware_analyzer.pypi_heuristics.metadata.high_release_frequency
+ :members:
+ :undoc-members:
+ :show-inheritance:
+
+macaron.malware\_analyzer.pypi\_heuristics.metadata.one\_release module
+-----------------------------------------------------------------------
+
+.. automodule:: macaron.malware_analyzer.pypi_heuristics.metadata.one_release
+ :members:
+ :undoc-members:
+ :show-inheritance:
+
+macaron.malware\_analyzer.pypi\_heuristics.metadata.unchanged\_release module
+-----------------------------------------------------------------------------
+
+.. automodule:: macaron.malware_analyzer.pypi_heuristics.metadata.unchanged_release
+ :members:
+ :undoc-members:
+ :show-inheritance:
+
+macaron.malware\_analyzer.pypi\_heuristics.metadata.unreachable\_project\_links module
+--------------------------------------------------------------------------------------
+
+.. automodule:: macaron.malware_analyzer.pypi_heuristics.metadata.unreachable_project_links
+ :members:
+ :undoc-members:
+ :show-inheritance:
diff --git a/docs/source/pages/developers_guide/apidoc/macaron.malware_analyzer.pypi_heuristics.rst b/docs/source/pages/developers_guide/apidoc/macaron.malware_analyzer.pypi_heuristics.rst
new file mode 100644
index 000000000..c1a16251c
--- /dev/null
+++ b/docs/source/pages/developers_guide/apidoc/macaron.malware_analyzer.pypi_heuristics.rst
@@ -0,0 +1,35 @@
+macaron.malware\_analyzer.pypi\_heuristics package
+==================================================
+
+.. automodule:: macaron.malware_analyzer.pypi_heuristics
+ :members:
+ :undoc-members:
+ :show-inheritance:
+
+Subpackages
+-----------
+
+.. toctree::
+ :maxdepth: 1
+
+ macaron.malware_analyzer.pypi_heuristics.metadata
+ macaron.malware_analyzer.pypi_heuristics.sourcecode
+
+Submodules
+----------
+
+macaron.malware\_analyzer.pypi\_heuristics.base\_analyzer module
+----------------------------------------------------------------
+
+.. automodule:: macaron.malware_analyzer.pypi_heuristics.base_analyzer
+ :members:
+ :undoc-members:
+ :show-inheritance:
+
+macaron.malware\_analyzer.pypi\_heuristics.heuristics module
+------------------------------------------------------------
+
+.. automodule:: macaron.malware_analyzer.pypi_heuristics.heuristics
+ :members:
+ :undoc-members:
+ :show-inheritance:
diff --git a/docs/source/pages/developers_guide/apidoc/macaron.malware_analyzer.pypi_heuristics.sourcecode.rst b/docs/source/pages/developers_guide/apidoc/macaron.malware_analyzer.pypi_heuristics.sourcecode.rst
new file mode 100644
index 000000000..f53afc8d8
--- /dev/null
+++ b/docs/source/pages/developers_guide/apidoc/macaron.malware_analyzer.pypi_heuristics.sourcecode.rst
@@ -0,0 +1,18 @@
+macaron.malware\_analyzer.pypi\_heuristics.sourcecode package
+=============================================================
+
+.. automodule:: macaron.malware_analyzer.pypi_heuristics.sourcecode
+ :members:
+ :undoc-members:
+ :show-inheritance:
+
+Submodules
+----------
+
+macaron.malware\_analyzer.pypi\_heuristics.sourcecode.suspicious\_setup module
+------------------------------------------------------------------------------
+
+.. automodule:: macaron.malware_analyzer.pypi_heuristics.sourcecode.suspicious_setup
+ :members:
+ :undoc-members:
+ :show-inheritance:
diff --git a/docs/source/pages/developers_guide/apidoc/macaron.malware_analyzer.rst b/docs/source/pages/developers_guide/apidoc/macaron.malware_analyzer.rst
new file mode 100644
index 000000000..2b7785b9b
--- /dev/null
+++ b/docs/source/pages/developers_guide/apidoc/macaron.malware_analyzer.rst
@@ -0,0 +1,26 @@
+macaron.malware\_analyzer package
+=================================
+
+.. automodule:: macaron.malware_analyzer
+ :members:
+ :undoc-members:
+ :show-inheritance:
+
+Subpackages
+-----------
+
+.. toctree::
+ :maxdepth: 1
+
+ macaron.malware_analyzer.pypi_heuristics
+
+Submodules
+----------
+
+macaron.malware\_analyzer.datetime\_parser module
+-------------------------------------------------
+
+.. automodule:: macaron.malware_analyzer.datetime_parser
+ :members:
+ :undoc-members:
+ :show-inheritance:
diff --git a/docs/source/pages/developers_guide/apidoc/macaron.rst b/docs/source/pages/developers_guide/apidoc/macaron.rst
index 13cb96478..3dffa65fc 100644
--- a/docs/source/pages/developers_guide/apidoc/macaron.rst
+++ b/docs/source/pages/developers_guide/apidoc/macaron.rst
@@ -16,6 +16,7 @@ Subpackages
macaron.config
macaron.database
macaron.dependency_analyzer
+ macaron.malware_analyzer
macaron.output_reporter
macaron.parsers
macaron.policy_engine
diff --git a/docs/source/pages/developers_guide/apidoc/macaron.slsa_analyzer.checks.rst b/docs/source/pages/developers_guide/apidoc/macaron.slsa_analyzer.checks.rst
index ec5c7db2b..7cf277e5b 100644
--- a/docs/source/pages/developers_guide/apidoc/macaron.slsa_analyzer.checks.rst
+++ b/docs/source/pages/developers_guide/apidoc/macaron.slsa_analyzer.checks.rst
@@ -49,6 +49,14 @@ macaron.slsa\_analyzer.checks.check\_result module
:undoc-members:
:show-inheritance:
+macaron.slsa\_analyzer.checks.detect\_malicious\_metadata\_check module
+-----------------------------------------------------------------------
+
+.. automodule:: macaron.slsa_analyzer.checks.detect_malicious_metadata_check
+ :members:
+ :undoc-members:
+ :show-inheritance:
+
macaron.slsa\_analyzer.checks.infer\_artifact\_pipeline\_check module
---------------------------------------------------------------------
diff --git a/docs/source/pages/developers_guide/apidoc/macaron.slsa_analyzer.package_registry.rst b/docs/source/pages/developers_guide/apidoc/macaron.slsa_analyzer.package_registry.rst
index 635f9adf6..72a2c35ea 100644
--- a/docs/source/pages/developers_guide/apidoc/macaron.slsa_analyzer.package_registry.rst
+++ b/docs/source/pages/developers_guide/apidoc/macaron.slsa_analyzer.package_registry.rst
@@ -40,3 +40,11 @@ macaron.slsa\_analyzer.package\_registry.package\_registry module
:members:
:undoc-members:
:show-inheritance:
+
+macaron.slsa\_analyzer.package\_registry.pypi\_registry module
+--------------------------------------------------------------
+
+.. automodule:: macaron.slsa_analyzer.package_registry.pypi_registry
+ :members:
+ :undoc-members:
+ :show-inheritance:
diff --git a/pyproject.toml b/pyproject.toml
index 8baaba188..5e8ab172d 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -95,6 +95,7 @@ test = [
"pytest-custom_exit_code >=0.3.0,<1.0.0",
"pytest-cov >=5.0.0,<6.0.0",
"pytest-env >=1.0.0,<2.0.0",
+ "pytest_httpserver >=1.0.10,<2.0.0",
"syrupy >=4.0.0,<5.0.0",
]
diff --git a/src/macaron/code_analyzer/call_graph.py b/src/macaron/code_analyzer/call_graph.py
index 3f21ddb5c..1f3be3fac 100644
--- a/src/macaron/code_analyzer/call_graph.py
+++ b/src/macaron/code_analyzer/call_graph.py
@@ -24,8 +24,8 @@ class BaseNode(Generic[Node]):
def __init__(self, caller: Node | None = None, node_id: str | None = None) -> None:
"""Initialize instance.
- Parameter
- ---------
+ Parameters
+ ----------
caller: Node | None
The caller node.
node_id: str | None
diff --git a/src/macaron/config/defaults.ini b/src/macaron/config/defaults.ini
index c1300efb3..ac4046376 100644
--- a/src/macaron/config/defaults.ini
+++ b/src/macaron/config/defaults.ini
@@ -519,7 +519,10 @@ request_timeout = 20
[package_registry.pypi]
request_timeout = 20
-hostname = pypi.org
+registry_url_netloc = pypi.org
+registry_url_scheme = https
+fileserver_url_netloc = files.pythonhosted.org
+fileserver_url_scheme = https
# Configuration options for selecting the checks to run.
# Both the exclude and include are defined as list of strings:
diff --git a/src/macaron/database/rfc3339_datetime.py b/src/macaron/database/db_custom_types.py
similarity index 68%
rename from src/macaron/database/rfc3339_datetime.py
rename to src/macaron/database/db_custom_types.py
index a597b74be..f40256099 100644
--- a/src/macaron/database/rfc3339_datetime.py
+++ b/src/macaron/database/db_custom_types.py
@@ -1,4 +1,4 @@
-# Copyright (c) 2023 - 2023, Oracle and/or its affiliates. All rights reserved.
+# Copyright (c) 2023 - 2024, Oracle and/or its affiliates. All rights reserved.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.
"""This module implements SQLAlchemy type for converting date format to RFC3339 string representation."""
@@ -6,7 +6,7 @@
import datetime
from typing import Any
-from sqlalchemy import String, TypeDecorator
+from sqlalchemy import JSON, String, TypeDecorator
class RFC3339DateTime(TypeDecorator): # pylint: disable=W0223
@@ -60,3 +60,35 @@ def process_result_value(self, value: None | str, dialect: Any) -> None | dateti
if result.tzinfo:
return result
return result.astimezone(RFC3339DateTime._host_tzinfo)
+
+
+class DBJsonDict(TypeDecorator): # pylint: disable=W0223
+ """SQLAlchemy column type to serialize dictionaries."""
+
+ # It is stored in the database as a json value.
+ impl = JSON
+
+ # To prevent Sphinx from rendering the docstrings for `cache_ok`, make this docstring private.
+ #: :meta private:
+ cache_ok = True
+
+ def process_bind_param(self, value: None | dict, dialect: Any) -> None | dict:
+ """Process when storing a dict object to the SQLite db.
+
+ value: None | dict
+ The value being stored
+ """
+ if not isinstance(value, dict):
+ raise TypeError("DBJsonDict type expects a dict.")
+
+ return value
+
+ def process_result_value(self, value: None | dict, dialect: Any) -> None | dict:
+ """Process when loading a dict object from the SQLite db.
+
+ value: None | dict
+ The value being loaded
+ """
+ if not isinstance(value, dict):
+ raise TypeError("DBJsonDict type expects a dict.")
+ return value
diff --git a/src/macaron/database/table_definitions.py b/src/macaron/database/table_definitions.py
index 542dd5679..fc010a74a 100644
--- a/src/macaron/database/table_definitions.py
+++ b/src/macaron/database/table_definitions.py
@@ -34,7 +34,7 @@
from macaron.artifact.maven import MavenSubjectPURLMatcher
from macaron.database.database_manager import ORMBase
-from macaron.database.rfc3339_datetime import RFC3339DateTime
+from macaron.database.db_custom_types import RFC3339DateTime
from macaron.errors import InvalidPURLError
from macaron.slsa_analyzer.provenance.intoto import InTotoPayload, ProvenanceSubjectPURLMatcher
from macaron.slsa_analyzer.slsa_req import ReqName
diff --git a/src/macaron/malware_analyzer/checks/__init__.py b/src/macaron/malware_analyzer/checks/__init__.py
deleted file mode 100644
index c8a50abb7..000000000
--- a/src/macaron/malware_analyzer/checks/__init__.py
+++ /dev/null
@@ -1,2 +0,0 @@
-# Copyright (c) 2024 - 2024, Oracle and/or its affiliates. All rights reserved.
-# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.
diff --git a/src/macaron/malware_analyzer/datetime_parser.py b/src/macaron/malware_analyzer/datetime_parser.py
index 6ad6140d1..96461f7bf 100644
--- a/src/macaron/malware_analyzer/datetime_parser.py
+++ b/src/macaron/malware_analyzer/datetime_parser.py
@@ -12,14 +12,17 @@
def parse_datetime(datetime_str: str, datetime_format: str = "%Y-%m-%dT%H:%M:%S") -> datetime | None:
"""Parse a datetime string and handle errors.
- Args
- ----
- datetime_str (str): The datetime string to parse.
- datetime_format (str): The format to use for parsing the datetime string.
+ Parameters
+ ----------
+ datetime_str: str:
+ The datetime string to parse.
+ datetime_format str:
+ The format to use for parsing the datetime string.
Returns
-------
- datetime: The parsed datetime object, or None if parsing failed.
+ datetime | None
+ The parsed datetime object, or None if parsing failed.
"""
try:
return datetime.strptime(datetime_str, datetime_format)
diff --git a/src/macaron/malware_analyzer/pypi_heuristics/base_analyzer.py b/src/macaron/malware_analyzer/pypi_heuristics/base_analyzer.py
index f02024567..0c55b03fd 100644
--- a/src/macaron/malware_analyzer/pypi_heuristics/base_analyzer.py
+++ b/src/macaron/malware_analyzer/pypi_heuristics/base_analyzer.py
@@ -3,13 +3,15 @@
"""Define and initialize the base analyzer."""
+import abc
from abc import abstractmethod
+from macaron.json_tools import JsonType
from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult, Heuristics
-from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIRegistry
+from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset
-class BaseHeuristicAnalyzer:
+class BaseHeuristicAnalyzer(abc.ABC):
"""The base analyzer initialization."""
def __init__(
@@ -25,13 +27,17 @@ def __init__(
)
@abstractmethod
- def analyze(self, api_client: PyPIRegistry) -> tuple[HeuristicResult, dict]:
+ def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicResult, dict[str, JsonType]]:
"""
Implement the base analyze method for seven analyzers.
+ Parameters
+ ----------
+ pypi_package_json: PyPIPackageJsonAsset
+ The PyPI package JSON asset object.
+
Returns
-------
- tuple[HeuristicResult, int | dict]: Contain the heuristic result and the metadata of the package.
- E.g. (1) The release frequency (2) {"maintainers_join_date": datetime}
+ tuple[HeuristicResult, dict[str, JsonType]]:
+ The result and related information collected during the analysis.
"""
- raise NotImplementedError
diff --git a/src/macaron/malware_analyzer/pypi_heuristics/heuristics.py b/src/macaron/malware_analyzer/pypi_heuristics/heuristics.py
index f4333f3db..0bd74d343 100644
--- a/src/macaron/malware_analyzer/pypi_heuristics/heuristics.py
+++ b/src/macaron/malware_analyzer/pypi_heuristics/heuristics.py
@@ -6,51 +6,43 @@
from enum import Enum
-class Heuristics(Enum):
- """Seven heuristics for detecting suspicious pypi package.
-
- Attributes
- ----------
- EMPTY_PROJECT_LINK : str
- Indicates that the package does not contain any project links (such as documentation or Git repository pages).
- UNREACHABLE_PROJECT_LINKS : str
- Indicates that the package contains project links, but all of them are unreachable.
- ONE_RELEASE : str
- Indicates that the package contains only one release.
- HIGH_RELEASE_FREQUENCY : str
- The package has a high release frequency. The average release time (calculated as the sum of all release gaps
- divided by the number of gaps) is below a set threshold, which defaults to 2 days.
- UNCHANGED_RELEASE : str
- Indicates that all releases contain the same content (with identical digests).
- CLOSER_RELEASE_JOIN_DATE : str
- Refers to the gap between the date the maintainer registered their account and the date of the latest release.
- SUSPICIOUS_SETUP : str
- Indicates that the setup.py file contains suspicious imports, such as base64 and requests.
- """
+class Heuristics(str, Enum):
+ """Seven heuristics for detecting suspicious pypi package."""
+ #: Indicates that the package does not contain any project links (such as documentation or Git repository pages).
EMPTY_PROJECT_LINK = "empty_project_link"
+
+ #: Indicates that the package contains project links, but all of them are unreachable.
UNREACHABLE_PROJECT_LINKS = "unreachable_project_links"
+
+ #: Indicates that the package contains only one release.
ONE_RELEASE = "one_release"
+
+ #: The package has a high release frequency. The average release time (calculated as the sum of all release gaps
+ #: divided by the number of gaps) is below a set threshold.
HIGH_RELEASE_FREQUENCY = "high_release_frequency"
+
+ #: Indicates that all releases contain the same content (with identical digests).
UNCHANGED_RELEASE = "unchanged_release"
- CLOSER_RELEASE_JOIN_DATE = "closer_release_join_date"
- SUSPICIOUS_SETUP = "suspicious_setup"
+ #: Indicates that the maintainer has registered their account close to the latest release date.
+ CLOSER_RELEASE_JOIN_DATE = "closer_release_join_date"
-class HeuristicResult(Enum):
- """Result type indicating the outcome of a heuristic.
+ #: Indicates that the setup.py file contains suspicious imports, such as base64 and requests.
+ SUSPICIOUS_SETUP = "suspicious_setup"
- Attributes
- ----------
- PASS: Indicates that no suspicious activity was detected.
- FAIL: Indicates that suspicious activity was detected.
- SKIP: Indicates that the heuristic check could not be performed due to missing metadata.
- The `SKIP` result occurs when the necessary metadata is not available. This often happens
- when fetching data through the PyPI API and the relevant data, such as the maintainer's
- join date or release information, is missing or unavailable.
- """
+class HeuristicResult(str, Enum):
+ """Result type indicating the outcome of a heuristic."""
+ #: Indicates that no suspicious activity was detected.
PASS = "PASS" # nosec B105
+
+ #: Indicates that suspicious activity was detected.
FAIL = "FAIL"
+
+ #: Indicates that the heuristic check could not be performed due to missing metadata.
+ #: The `SKIP` result occurs when the necessary metadata is not available. This often happens
+ #: when fetching data through the PyPI API and the relevant data, such as the maintainer's
+ #: join date or release information, is missing or unavailable.
SKIP = "SKIP"
diff --git a/src/macaron/malware_analyzer/pypi_heuristics/metadata/closer_release_join_date.py b/src/macaron/malware_analyzer/pypi_heuristics/metadata/closer_release_join_date.py
index 706e35b7b..83333f3c9 100644
--- a/src/macaron/malware_analyzer/pypi_heuristics/metadata/closer_release_join_date.py
+++ b/src/macaron/malware_analyzer/pypi_heuristics/metadata/closer_release_join_date.py
@@ -6,10 +6,11 @@
from datetime import datetime, timedelta
from macaron.config.defaults import defaults
+from macaron.json_tools import JsonType
from macaron.malware_analyzer.datetime_parser import parse_datetime
from macaron.malware_analyzer.pypi_heuristics.base_analyzer import BaseHeuristicAnalyzer
from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult, Heuristics
-from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIRegistry
+from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset, PyPIRegistry
class CloserReleaseJoinDateAnalyzer(BaseHeuristicAnalyzer):
@@ -32,68 +33,72 @@ def _load_defaults(self) -> int:
return section.getint("timedelta_threshold_of_join_release")
return 5
- def _get_maintainers_join_date(self, api_client: PyPIRegistry) -> list[datetime] | None:
+ def _get_maintainers_join_date(self, pypi_registry: PyPIRegistry, package_name: str) -> list[datetime] | None:
"""Get the join date of the maintainers.
Each package might have multiple maintainers.
Parameters
----------
- api_client: PyPIRegistry
- The API client.
+ pypi_registry: PyPIRegistry
+ The PyPI registry implementation.
+ package_name: str
+ The package name.
Returns
-------
list[datetime] | None
The maintainers' join date.
"""
- maintainers: list | None = api_client.get_maintainers_of_package()
+ maintainers: list | None = pypi_registry.get_maintainers_of_package(package_name)
if maintainers is None:
return None
join_dates: list[datetime] = []
for maintainer in maintainers:
- maintainer_join_date = api_client.get_maintainer_join_date(maintainer)
+ maintainer_join_date = pypi_registry.get_maintainer_join_date(maintainer)
if maintainer_join_date is not None:
join_dates.append(maintainer_join_date)
return join_dates
- def _get_latest_release_date(self, api_client: PyPIRegistry) -> datetime | None:
+ def _get_latest_release_date(self, pypi_package_json: PyPIPackageJsonAsset) -> datetime | None:
"""Get package's latest release date.
Parameters
----------
- api_client: PyPIRegistry
- The API client.
+ pypi_package_json: PyPIPackageJsonAsset
+ The PyPI package JSON asset object.
Returns
-------
datetime | None
The package's latest release date.
"""
- upload_time: str | None = api_client.get_latest_release_upload_time()
+ upload_time: str | None = pypi_package_json.get_latest_release_upload_time()
if not upload_time:
return None
datetime_format: str = "%Y-%m-%dT%H:%M:%S"
return parse_datetime(upload_time, datetime_format)
- def analyze(self, api_client: PyPIRegistry) -> tuple[HeuristicResult, dict]:
+ def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicResult, dict[str, JsonType]]:
"""Check whether the maintainers' join date closer to package's latest release date.
Parameters
----------
- api_client: PyPIRegistry
- The API client.
+ pypi_package_json: PyPIPackageJsonAsset
+ The PyPI package JSON asset object.
Returns
-------
- tuple[HeuristicResult, dict]
- The result and details.
+ tuple[HeuristicResult, dict[str, JsonType]]:
+ The result and related information collected during the analysis.
"""
- maintainers_join_date: list[datetime] | None = self._get_maintainers_join_date(api_client)
- latest_release_date: datetime | None = self._get_latest_release_date(api_client)
- detail_info = {
+ maintainers_join_date: list[datetime] | None = self._get_maintainers_join_date(
+ pypi_package_json.pypi_registry, pypi_package_json.component.name
+ )
+ latest_release_date: datetime | None = self._get_latest_release_date(pypi_package_json)
+ detail_info: dict[str, JsonType] = {
"maintainers_join_date": (
[date.strftime("%Y-%m-%d %H:%M:%S") for date in maintainers_join_date] if maintainers_join_date else []
),
diff --git a/src/macaron/malware_analyzer/pypi_heuristics/metadata/empty_project_link.py b/src/macaron/malware_analyzer/pypi_heuristics/metadata/empty_project_link.py
index 0803e289b..b6dd7ac80 100644
--- a/src/macaron/malware_analyzer/pypi_heuristics/metadata/empty_project_link.py
+++ b/src/macaron/malware_analyzer/pypi_heuristics/metadata/empty_project_link.py
@@ -3,9 +3,10 @@
"""Analyzer checks there is no project link of the package."""
+from macaron.json_tools import JsonType
from macaron.malware_analyzer.pypi_heuristics.base_analyzer import BaseHeuristicAnalyzer
from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult, Heuristics
-from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIRegistry
+from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset
class EmptyProjectLinkAnalyzer(BaseHeuristicAnalyzer):
@@ -14,23 +15,23 @@ class EmptyProjectLinkAnalyzer(BaseHeuristicAnalyzer):
def __init__(self) -> None:
super().__init__(name="empty_project_link_analyzer", heuristic=Heuristics.EMPTY_PROJECT_LINK, depends_on=None)
- def analyze(self, api_client: PyPIRegistry) -> tuple[HeuristicResult, dict]:
+ def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicResult, dict[str, JsonType]]:
"""Check whether the PyPI package has no project link.
Parameters
----------
- api_client: PyPIRegistry
- The API client.
+ pypi_package_json: PyPIPackageJsonAsset
+ The PyPI package JSON asset object.
Returns
-------
- tuple[HeuristicResult, dict]
- The result and project links if they exist. Otherwise, return an empty dictionary
+ tuple[HeuristicResult, dict[str, JsonType]]:
+ The result and related information collected during the analysis.
"""
- project_links: dict[str, str] | None = api_client.get_project_links()
+ project_links = pypi_package_json.get_project_links()
if project_links is None:
- return HeuristicResult.SKIP, {}
+ return HeuristicResult.FAIL, {}
if len(project_links) == 0: # Total.
return HeuristicResult.FAIL, {}
diff --git a/src/macaron/malware_analyzer/pypi_heuristics/metadata/high_release_frequency.py b/src/macaron/malware_analyzer/pypi_heuristics/metadata/high_release_frequency.py
index f3a48b462..e68b28dca 100644
--- a/src/macaron/malware_analyzer/pypi_heuristics/metadata/high_release_frequency.py
+++ b/src/macaron/malware_analyzer/pypi_heuristics/metadata/high_release_frequency.py
@@ -7,11 +7,11 @@
from datetime import datetime
from macaron.config.defaults import defaults
-from macaron.json_tools import json_extract
+from macaron.json_tools import JsonType, json_extract
from macaron.malware_analyzer.datetime_parser import parse_datetime
from macaron.malware_analyzer.pypi_heuristics.base_analyzer import BaseHeuristicAnalyzer
from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult, Heuristics
-from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIRegistry
+from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset
logger: logging.Logger = logging.getLogger(__name__)
@@ -35,20 +35,20 @@ def _load_defaults(self) -> int:
return section.getint("releases_frequency_threshold")
return 2
- def analyze(self, api_client: PyPIRegistry) -> tuple[HeuristicResult, dict]:
+ def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicResult, dict[str, JsonType]]:
"""Check whether the release frequency is high.
Parameters
----------
- api_client: PyPIRegistry
- The API client.
+ pypi_package_json: PyPIPackageJsonAsset
+ The PyPI package JSON asset object.
Returns
-------
- tuple[HeuristicResult, dict]
- The result and details.
+ tuple[HeuristicResult, dict[str, JsonType]]:
+ The result and related information collected during the analysis.
"""
- version_to_releases: dict | None = api_client.get_releases()
+ version_to_releases: dict | None = pypi_package_json.get_releases()
if version_to_releases is None or len(version_to_releases) == 1:
return HeuristicResult.SKIP, {}
diff --git a/src/macaron/malware_analyzer/pypi_heuristics/metadata/one_release.py b/src/macaron/malware_analyzer/pypi_heuristics/metadata/one_release.py
index 7ae156ee7..4a12b746a 100644
--- a/src/macaron/malware_analyzer/pypi_heuristics/metadata/one_release.py
+++ b/src/macaron/malware_analyzer/pypi_heuristics/metadata/one_release.py
@@ -4,9 +4,10 @@
"""Analyzer checks the packages contain one release."""
+from macaron.json_tools import JsonType
from macaron.malware_analyzer.pypi_heuristics.base_analyzer import BaseHeuristicAnalyzer
from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult, Heuristics
-from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIRegistry
+from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset
class OneReleaseAnalyzer(BaseHeuristicAnalyzer):
@@ -15,20 +16,20 @@ class OneReleaseAnalyzer(BaseHeuristicAnalyzer):
def __init__(self) -> None:
super().__init__(name="one_release_analyzer", heuristic=Heuristics.ONE_RELEASE, depends_on=None)
- def analyze(self, api_client: PyPIRegistry) -> tuple[HeuristicResult, dict]:
+ def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicResult, dict[str, JsonType]]:
"""Check the releases' total is one.
Parameters
----------
- api_client: PyPIRegistry
- The API client.
+ pypi_package_json: PyPIPackageJsonAsset
+ The PyPI package JSON asset object.
Returns
-------
- tuple[HeuristicResult, dict]
- The result and details.
+ tuple[HeuristicResult, dict[str, JsonType]]:
+ The result and related information collected during the analysis.
"""
- releases: dict | None = api_client.get_releases()
+ releases: dict | None = pypi_package_json.get_releases()
if releases is None:
return HeuristicResult.SKIP, {"releases": {}}
diff --git a/src/macaron/malware_analyzer/pypi_heuristics/metadata/unchanged_release.py b/src/macaron/malware_analyzer/pypi_heuristics/metadata/unchanged_release.py
index 172634429..278f3eeb5 100644
--- a/src/macaron/malware_analyzer/pypi_heuristics/metadata/unchanged_release.py
+++ b/src/macaron/malware_analyzer/pypi_heuristics/metadata/unchanged_release.py
@@ -5,10 +5,10 @@
import logging
from collections import Counter
-from macaron.json_tools import json_extract
+from macaron.json_tools import JsonType, json_extract
from macaron.malware_analyzer.pypi_heuristics.base_analyzer import BaseHeuristicAnalyzer
from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult, Heuristics
-from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIRegistry
+from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset
logger: logging.Logger = logging.getLogger(__name__)
@@ -24,20 +24,20 @@ def __init__(self) -> None:
)
self.hash_algo: str = "sha256"
- def _get_digests(self, api_client: PyPIRegistry) -> list[str] | None:
+ def _get_digests(self, pypi_package_json: PyPIPackageJsonAsset) -> list[str] | None:
"""Get all digests of the releases.
Parameters
----------
- api_client: PyPIRegistry
- The API client.
+ pypi_package_json: PyPIPackageJsonAsset
+ The PyPI package JSON asset object.
Returns
-------
list[str] | None
The digests.
"""
- releases: dict | None = api_client.get_releases()
+ releases: dict | None = pypi_package_json.get_releases()
if releases is None:
return None
@@ -53,20 +53,20 @@ def _get_digests(self, api_client: PyPIRegistry) -> list[str] | None:
return digests
- def analyze(self, api_client: PyPIRegistry) -> tuple[HeuristicResult, dict]:
+ def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicResult, dict[str, JsonType]]:
"""Check the content of releases keep updating.
Parameters
----------
- api_client: PyPIRegistry
- The API client.
+ pypi_package_json: PyPIPackageJsonAsset
+ The PyPI package JSON asset object.
Returns
-------
- tuple[HeuristicResult, dict]
- The result and relevant metadata.
+ tuple[HeuristicResult, dict[str, JsonType]]:
+ The result and related information collected during the analysis.
"""
- digests: list[str] | None = self._get_digests(api_client)
+ digests: list[str] | None = self._get_digests(pypi_package_json)
if digests is None:
return HeuristicResult.SKIP, {}
diff --git a/src/macaron/malware_analyzer/pypi_heuristics/metadata/unreachable_project_links.py b/src/macaron/malware_analyzer/pypi_heuristics/metadata/unreachable_project_links.py
index de370b356..8824c7a25 100644
--- a/src/macaron/malware_analyzer/pypi_heuristics/metadata/unreachable_project_links.py
+++ b/src/macaron/malware_analyzer/pypi_heuristics/metadata/unreachable_project_links.py
@@ -7,9 +7,10 @@
import requests
+from macaron.json_tools import JsonType
from macaron.malware_analyzer.pypi_heuristics.base_analyzer import BaseHeuristicAnalyzer
from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult, Heuristics
-from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIRegistry
+from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset
logger: logging.Logger = logging.getLogger(__name__)
@@ -28,20 +29,20 @@ def __init__(self) -> None:
depends_on=[(Heuristics.EMPTY_PROJECT_LINK, HeuristicResult.PASS)],
)
- def analyze(self, api_client: PyPIRegistry) -> tuple[HeuristicResult, dict]:
+ def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicResult, dict[str, JsonType]]:
"""Analyze the package.
Parameters
----------
- api_client: PyPIRegistry
- The API client.
+ pypi_package_json: PyPIPackageJsonAsset
+ The PyPI package JSON asset object.
Returns
-------
- tuple[HeuristicResult, dict]
- The result type and relevant metadata.
+ tuple[HeuristicResult, dict[str, JsonType]]:
+ The result and related information collected during the analysis.
"""
- project_links: dict | None = api_client.get_project_links()
+ project_links: dict | None = pypi_package_json.get_project_links()
if project_links is None:
return HeuristicResult.SKIP, {}
diff --git a/src/macaron/malware_analyzer/pypi_heuristics/sourcecode/suspicious_setup.py b/src/macaron/malware_analyzer/pypi_heuristics/sourcecode/suspicious_setup.py
index 2e5d2760a..5f7f130a7 100644
--- a/src/macaron/malware_analyzer/pypi_heuristics/sourcecode/suspicious_setup.py
+++ b/src/macaron/malware_analyzer/pypi_heuristics/sourcecode/suspicious_setup.py
@@ -14,9 +14,10 @@
import requests
from requests import RequestException
+from macaron.json_tools import JsonType
from macaron.malware_analyzer.pypi_heuristics.base_analyzer import BaseHeuristicAnalyzer
from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult, Heuristics
-from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIRegistry
+from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset
logger: logging.Logger = logging.getLogger(__name__)
@@ -28,20 +29,20 @@ def __init__(self) -> None:
super().__init__(name="suspicious_setup_analyzer", heuristic=Heuristics.SUSPICIOUS_SETUP, depends_on=None)
self.blacklist: list = ["base64", "request"]
- def _get_setup_source_code(self, api_client: PyPIRegistry) -> str | None:
+ def _get_setup_source_code(self, pypi_package_json: PyPIPackageJsonAsset) -> str | None:
"""Get the source code in setup.py.
Parameters
----------
- api_client: PyPIRegistry
- The API client to use for source code retrieval.
+ pypi_package_json: PyPIPackageJsonAsset
+ The PyPI package JSON asset object.
Returns
-------
str | None
The source code.
"""
- sourcecode_url: str | None = api_client.get_sourcecode_url()
+ sourcecode_url: str | None = pypi_package_json.get_sourcecode_url()
if sourcecode_url is None:
return None
@@ -113,20 +114,20 @@ def _get_setup_source_code(self, api_client: PyPIRegistry) -> str | None:
with open(final_path, encoding="utf-8") as file:
return file.read()
- def analyze(self, api_client: PyPIRegistry) -> tuple[HeuristicResult, dict]:
+ def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicResult, dict[str, JsonType]]:
"""Analyze suspicious packages are imported in setup.py.
Parameters
----------
- api_client: PyPIRegistry
- The API client to use for analysis.
+ pypi_package_json: PyPIPackageJsonAsset
+ The PyPI package JSON asset object.
Returns
-------
- tuple[HeuristicResult, dict]
- The Result and details.
+ tuple[HeuristicResult, dict[str, JsonType]]:
+ The result and related information collected during the analysis.
"""
- content: str | None = self._get_setup_source_code(api_client)
+ content: str | None = self._get_setup_source_code(pypi_package_json)
if content is None:
return HeuristicResult.SKIP, {}
@@ -144,8 +145,8 @@ def analyze(self, api_client: PyPIRegistry) -> tuple[HeuristicResult, dict]:
suspicious_setup = any(suspicious_keyword in imp for imp in imports for suspicious_keyword in self.blacklist)
if suspicious_setup:
- return HeuristicResult.FAIL, {"import_module": imports}
- return HeuristicResult.PASS, {"import_module": imports}
+ return HeuristicResult.FAIL, {"import_module": list(imports)}
+ return HeuristicResult.PASS, {"import_module": list(imports)}
def extract_from_ast(self, source_content: str) -> set[str]:
"""Extract imports from source code using the parsed AST.
diff --git a/src/macaron/policy_engine/souffle_code_generator.py b/src/macaron/policy_engine/souffle_code_generator.py
index f0b11e904..b768ba5a7 100644
--- a/src/macaron/policy_engine/souffle_code_generator.py
+++ b/src/macaron/policy_engine/souffle_code_generator.py
@@ -7,7 +7,7 @@
import os
from sqlalchemy import Column, Float, MetaData, Table
-from sqlalchemy.sql.sqltypes import Boolean, Integer, String, Text
+from sqlalchemy.sql.sqltypes import JSON, Boolean, Integer, String, Text
logger: logging.Logger = logging.getLogger(__name__)
@@ -87,6 +87,8 @@ def column_to_souffle_type(column: Column) -> str:
souffle_type = "symbol"
elif isinstance(sql_type, Boolean):
souffle_type = "number"
+ elif isinstance(sql_type, JSON):
+ souffle_type = "symbol"
else:
raise ValueError("Unexpected column type in table")
return souffle_type
diff --git a/src/macaron/slsa_analyzer/analyzer.py b/src/macaron/slsa_analyzer/analyzer.py
index 89ba06b30..6cff9716a 100644
--- a/src/macaron/slsa_analyzer/analyzer.py
+++ b/src/macaron/slsa_analyzer/analyzer.py
@@ -32,9 +32,6 @@
PURLNotFoundError,
RepoCheckOutError,
)
-from macaron.malware_analyzer.checks import ( # pylint: disable=unused-import # noqa: F401
- detect_malicious_metadata_check,
-)
from macaron.output_reporter.reporter import FileReporter
from macaron.output_reporter.results import Record, Report, SCMStatus
from macaron.repo_finder import repo_finder
@@ -1074,8 +1071,10 @@ def perform_checks(self, analyze_ctx: AnalyzeContext) -> dict[str, CheckResult]:
)
# Determine the package registries.
- # We match the repo against package registries through build tools.
- build_tools = analyze_ctx.dynamic_data["build_spec"]["tools"]
+ # We match the software component against package registries through build tools.
+ build_tools = (
+ analyze_ctx.dynamic_data["build_spec"]["tools"] or analyze_ctx.dynamic_data["build_spec"]["purl_tools"]
+ )
for package_registry in PACKAGE_REGISTRIES:
for build_tool in build_tools:
if package_registry.is_detected(build_tool):
diff --git a/src/macaron/malware_analyzer/checks/detect_malicious_metadata_check.py b/src/macaron/slsa_analyzer/checks/detect_malicious_metadata_check.py
similarity index 67%
rename from src/macaron/malware_analyzer/checks/detect_malicious_metadata_check.py
rename to src/macaron/slsa_analyzer/checks/detect_malicious_metadata_check.py
index 50871dc6a..d8755cddf 100644
--- a/src/macaron/malware_analyzer/checks/detect_malicious_metadata_check.py
+++ b/src/macaron/slsa_analyzer/checks/detect_malicious_metadata_check.py
@@ -5,11 +5,12 @@
import logging
-from packageurl import PackageURL
-from sqlalchemy import ForeignKey, String
+from sqlalchemy import ForeignKey
from sqlalchemy.orm import Mapped, mapped_column
+from macaron.database.db_custom_types import DBJsonDict
from macaron.database.table_definitions import CheckFacts
+from macaron.json_tools import JsonType
from macaron.malware_analyzer.pypi_heuristics.base_analyzer import BaseHeuristicAnalyzer
from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult, Heuristics
from macaron.malware_analyzer.pypi_heuristics.metadata.closer_release_join_date import CloserReleaseJoinDateAnalyzer
@@ -20,15 +21,18 @@
from macaron.malware_analyzer.pypi_heuristics.metadata.unreachable_project_links import UnreachableProjectLinksAnalyzer
from macaron.malware_analyzer.pypi_heuristics.sourcecode.suspicious_setup import SuspiciousSetupAnalyzer
from macaron.slsa_analyzer.analyze_context import AnalyzeContext
+from macaron.slsa_analyzer.build_tool.pip import Pip
+from macaron.slsa_analyzer.build_tool.poetry import Poetry
from macaron.slsa_analyzer.checks.base_check import BaseCheck
from macaron.slsa_analyzer.checks.check_result import CheckResultData, CheckResultType, Confidence, JustificationType
-from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIRegistry
+from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset, PyPIRegistry
from macaron.slsa_analyzer.registry import registry
+from macaron.slsa_analyzer.specs.package_registry_spec import PackageRegistryInfo
logger: logging.Logger = logging.getLogger(__name__)
-class HeuristicAnalysisResultFacts(CheckFacts):
+class MaliciousMetadataFacts(CheckFacts):
"""The ORM mapping for justifications in pypi heuristic check."""
__tablename__ = "_detect_malicious_metadata_check"
@@ -36,17 +40,12 @@ class HeuristicAnalysisResultFacts(CheckFacts):
#: The primary key.
id: Mapped[int] = mapped_column(ForeignKey("_check_facts.id"), primary_key=True) # noqa: A003
- #: List of heuristic names that failed.
- heuristics_fail: Mapped[str] = mapped_column(String, nullable=False, info={"justification": JustificationType.TEXT})
-
#: Detailed information about the analysis.
- detail_information: Mapped[str] = mapped_column(
- String, nullable=False, info={"justification": JustificationType.TEXT}
- )
+ detail_information: Mapped[dict[str, JsonType]] = mapped_column(DBJsonDict, nullable=False)
- #: The result of heuristic analysis.
- heuristic_result: Mapped[str] = mapped_column(
- String, nullable=False, info={"justification": JustificationType.TEXT}
+ #: The result of analysis, which is of dict[Heuristics, HeuristicResult] type.
+ result: Mapped[dict[Heuristics, HeuristicResult]] = mapped_column(
+ DBJsonDict, nullable=False, info={"justification": JustificationType.TEXT}
)
__mapper_args__ = {
@@ -91,17 +90,6 @@ class HeuristicAnalysisResultFacts(CheckFacts):
# after account registration.
# The setup.py file contains suspicious imports.
): Confidence.HIGH,
- (
- HeuristicResult.FAIL, # Empty Project
- HeuristicResult.SKIP, # Unreachable Project Links
- HeuristicResult.FAIL, # One Release
- HeuristicResult.SKIP, # High Release Frequency
- HeuristicResult.SKIP, # Unchanged Release
- HeuristicResult.FAIL, # Closer Release Join Date
- HeuristicResult.PASS, # Suspicious Setup
- # No project link, only one release, and the maintainer released it shortly
- # after account registration.
- ): Confidence.MEDIUM,
(
HeuristicResult.FAIL, # Empty Project
HeuristicResult.SKIP, # Unreachable Project Links
@@ -137,17 +125,6 @@ class HeuristicAnalysisResultFacts(CheckFacts):
# No project link, frequent releases of multiple versions without modifying the content,
# and the maintainer released it shortly after account registration.
): Confidence.MEDIUM,
- (
- HeuristicResult.FAIL, # Empty Project
- HeuristicResult.SKIP, # Unreachable Project Links
- HeuristicResult.PASS, # One Release
- HeuristicResult.FAIL, # High Release Frequency
- HeuristicResult.PASS, # Unchanged Release
- HeuristicResult.FAIL, # Closer Release Join Date
- HeuristicResult.PASS, # Suspicious Setup
- # No project link, frequent releases of multiple versions,
- # and the maintainer released it shortly after account registration.
- ): Confidence.LOW,
(
HeuristicResult.PASS, # Empty Project
HeuristicResult.FAIL, # Unreachable Project Links
@@ -160,17 +137,6 @@ class HeuristicAnalysisResultFacts(CheckFacts):
# and the maintainer released it shortly after account registration.
# The setup.py file contains suspicious imports.
): Confidence.HIGH,
- # (
- # HeuristicResult.PASS, # Empty Project
- # HeuristicResult.FAIL, # Unreachable Project Links
- # HeuristicResult.PASS, # One Release
- # HeuristicResult.FAIL, # High Release Frequency
- # HeuristicResult.PASS, # Unchanged Release
- # HeuristicResult.FAIL, # Closer Release Join Date
- # HeuristicResult.PASS, # Suspicious Setup
- # # All project links are unreachable, frequent releases of multiple versions,
- # # and the maintainer released it shortly after account registration.
- # ): Confidence.LOW,
}
@@ -212,22 +178,22 @@ def _should_skip(
return False
def run_heuristics(
- self, api_client: PyPIRegistry
- ) -> tuple[dict[Heuristics, HeuristicResult], dict[str, int | dict]]:
- """Run the main logic of heuristics analysis.
+ self, pypi_package_json: PyPIPackageJsonAsset
+ ) -> tuple[dict[Heuristics, HeuristicResult], dict[str, JsonType]]:
+ """Run the analysis heuristics.
Parameters
----------
- api_client: PyPIRegistry
- The PyPI API client object used to interact with the PyPI API.
+ pypi_package_json: PyPIPackageJsonAsset
+ The PyPI package JSON asset object.
Returns
-------
- tuple[dict[Heuristics, HeuristicResult], dict[str, int | dict]]
- Containing the heuristics' results and relevant metadata.
+ tuple[dict[Heuristics, HeuristicResult], dict[str, JsonType]]
+ Containing the analysis results and relevant metadata.
"""
results: dict[Heuristics, HeuristicResult] = {}
- detail_infos: dict[str, int | dict] = {}
+ detail_info: dict[str, JsonType] = {}
for _analyzer in ANALYZERS:
analyzer: BaseHeuristicAnalyzer = _analyzer()
logger.debug("Instantiating %s", _analyzer.__name__)
@@ -239,11 +205,11 @@ def run_heuristics(
results[analyzer.heuristic] = HeuristicResult.SKIP
continue
- result, detail_info = analyzer.analyze(api_client)
+ result, result_info = analyzer.analyze(pypi_package_json)
if analyzer.heuristic:
results[analyzer.heuristic] = result
- detail_infos.update(detail_info)
- return results, detail_infos
+ detail_info.update(result_info)
+ return results, detail_info
def run_check(self, ctx: AnalyzeContext) -> CheckResultData:
"""Implement the check in this method.
@@ -258,39 +224,47 @@ def run_check(self, ctx: AnalyzeContext) -> CheckResultData:
CheckResultData
The result of the check.
"""
- parsed_purl = PackageURL.from_string(ctx.component.purl)
- if parsed_purl.type != "pypi":
- return CheckResultData(result_tables=[], result_type=CheckResultType.UNKNOWN)
- package = parsed_purl.name
- result_tables: list[CheckFacts] = []
-
- api_client: PyPIRegistry = PyPIRegistry()
- api_client.load_defaults()
- api_client.download_attestation_payload(package)
- result, detail_infos = self.run_heuristics(api_client)
- heuristics_fail: list[str] = [
- heuristic.value for heuristic, result in result.items() if result is HeuristicResult.FAIL
- ]
- result_combo: tuple = tuple(result.values())
- confidence: float | None = SUSPICIOUS_COMBO.get(result_combo, None)
- result_type = CheckResultType.FAILED
- if confidence is None:
- confidence = Confidence.HIGH
- result_type = CheckResultType.PASSED
-
- result_tables.append(
- HeuristicAnalysisResultFacts(
- heuristics_fail=str(heuristics_fail),
- heuristic_result=str(result),
- detail_information=str(detail_infos),
- confidence=confidence,
- )
- )
-
- return CheckResultData(
- result_tables=result_tables,
- result_type=result_type,
- )
+ package_registry_info_entries = ctx.dynamic_data["package_registries"]
+ for package_registry_info_entry in package_registry_info_entries:
+ match package_registry_info_entry:
+ case PackageRegistryInfo(
+ build_tool=Pip() | Poetry(),
+ package_registry=PyPIRegistry() as pypi_registry,
+ ) as pypi_registry_info:
+ result_tables: list[CheckFacts] = []
+
+ # Create an AssetLocator object for the PyPI package JSON object.
+ pypi_package_json = PyPIPackageJsonAsset(
+ component=ctx.component, pypi_registry=pypi_registry, package_json={}
+ )
+
+ pypi_registry_info.metadata.append(pypi_package_json)
+
+ # Download the PyPI package JSON, but no need to persist it to the filesystem.
+ if pypi_package_json.download(dest=""):
+ result, detail_info = self.run_heuristics(pypi_package_json)
+ result_combo: tuple = tuple(result.values())
+ confidence: float | None = SUSPICIOUS_COMBO.get(result_combo, None)
+ result_type = CheckResultType.FAILED
+ if confidence is None:
+ confidence = Confidence.HIGH
+ result_type = CheckResultType.PASSED
+
+ result_tables.append(
+ MaliciousMetadataFacts(
+ result=result,
+ detail_information=detail_info,
+ confidence=confidence,
+ )
+ )
+
+ return CheckResultData(
+ result_tables=result_tables,
+ result_type=result_type,
+ )
+
+ # Return UNKNOWN result for unsupported ecosystems.
+ return CheckResultData(result_tables=[], result_type=CheckResultType.UNKNOWN)
registry.register(DetectMaliciousMetadataCheck())
diff --git a/src/macaron/slsa_analyzer/package_registry/pypi_registry.py b/src/macaron/slsa_analyzer/package_registry/pypi_registry.py
index dc257e973..deffc50bf 100644
--- a/src/macaron/slsa_analyzer/package_registry/pypi_registry.py
+++ b/src/macaron/slsa_analyzer/package_registry/pypi_registry.py
@@ -5,13 +5,15 @@
import logging
import os
+import urllib.parse
+from dataclasses import dataclass
from datetime import datetime
-from urllib.parse import urljoin
import requests
from bs4 import BeautifulSoup, Tag
from macaron.config.defaults import defaults
+from macaron.database.table_definitions import Component
from macaron.errors import ConfigurationError, InvalidHTTPResponseError
from macaron.json_tools import json_extract
from macaron.malware_analyzer.datetime_parser import parse_datetime
@@ -28,7 +30,10 @@ class PyPIRegistry(PackageRegistry):
def __init__(
self,
- hostname: str | None = None,
+ registry_url_netloc: str | None = None,
+ registry_url_scheme: str | None = None,
+ fileserver_url_netloc: str | None = None,
+ fileserver_url_scheme: str | None = None,
request_timeout: int | None = None,
enabled: bool = True,
) -> None:
@@ -37,20 +42,27 @@ def __init__(
Parameters
----------
- hostname: str | None
- The hostname of the pypi registry.
+ registry_url_netloc: str | None
+ The netloc of the pypi registry url.
+ registry_url_scheme: str | None
+ The scheme of the pypi registry url.
+ fileserver_url_netloc: str | None
+ The netloc of the server url that stores package source files, which contains the hostname and port.
+ fileserver_url_scheme: str | None
+ The scheme of the server url that stores package source files.
request_timeout: int | None
The timeout (in seconds) for requests made to the package registry.
enabled: bool
Shows whether making REST API calls to pypi registry is enabled.
"""
- self.hostname = hostname or ""
+ self.registry_url_netloc = registry_url_netloc or ""
+ self.registry_url_scheme = registry_url_scheme or ""
+ self.fileserver_url_netloc = fileserver_url_netloc or ""
+ self.fileserver_url_scheme = fileserver_url_scheme or ""
self.request_timeout = request_timeout or 10
self.enabled = enabled
- self.attestation: dict = {}
- self.base_url = ""
- self.package = ""
+ self.registry_url = ""
super().__init__("PyPI Registry")
def load_defaults(self) -> None:
@@ -66,12 +78,28 @@ def load_defaults(self) -> None:
return
section = defaults[section_name]
- self.hostname = section.get("hostname")
- if not self.hostname:
+ self.registry_url_netloc = section.get("registry_url_netloc")
+ if not self.registry_url_netloc:
raise ConfigurationError(
- f'The "hostname" key is missing in section [{section_name}] of the .ini configuration file.'
+ f'The "registry_url_netloc" key is missing in section [{section_name}] of the .ini configuration file.'
)
- self.base_url = f"https://{self.hostname}"
+ self.registry_url_scheme = section.get("registry_url_scheme", "https")
+ self.registry_url = urllib.parse.ParseResult(
+ scheme=self.registry_url_scheme,
+ netloc=self.registry_url_netloc,
+ path="",
+ params="",
+ query="",
+ fragment="",
+ ).geturl()
+
+ fileserver_url_netloc = section.get("fileserver_url_netloc")
+ if not fileserver_url_netloc:
+ raise ConfigurationError(
+ f'The "fileserver_url_netloc" key is missing in section [{section_name}] of the .ini configuration file.'
+ )
+ self.fileserver_url_netloc = fileserver_url_netloc
+ self.fileserver_url_scheme = section.get("fileserver_url_scheme", "https")
try:
self.request_timeout = section.getint("request_timeout", fallback=10)
@@ -107,135 +135,73 @@ def is_detected(self, build_tool: BaseBuildTool) -> bool:
return True
return False
- def download_attestation_payload(self, package: str) -> bool:
- """Download the pypi attestation from pypi registry.
+ def download_package_json(self, url: str) -> dict:
+ """Download the package JSON metadata from pypi registry.
Parameters
----------
- package: str
- The package name.
+ url: str
+ The package JSON url.
Returns
-------
- bool
- ``True`` if the asset is downloaded successfully; ``False`` if not.
+ dict
+ The JSON response if the request is successful.
Raises
------
InvalidHTTPResponseError
If the HTTP request to the registry fails or an unexpected response is returned.
"""
- self.package = package
- attestation_endpoint = f"pypi/{package}/json"
- url = urljoin(self.base_url, attestation_endpoint)
response = send_get_http_raw(url, headers=None, timeout=self.request_timeout)
if not response:
- logger.debug("Unable to find attestation for %s", package)
- return False
+ logger.debug("Unable to find package JSON metadata using URL: %s", url)
+ raise InvalidHTTPResponseError(f"Unable to find package JSON metadata using URL: {url}.")
try:
res_obj = response.json()
except requests.exceptions.JSONDecodeError as error:
raise InvalidHTTPResponseError(f"Failed to process response from pypi for {url}.") from error
- if not res_obj:
+ if not isinstance(res_obj, dict):
raise InvalidHTTPResponseError(f"Empty response returned by {url} .")
- self.attestation = res_obj
-
- return True
-
- def get_releases(self) -> dict | None:
- """Get all releases.
-
- Returns
- -------
- dict | None
- Version to metadata.
- """
- return json_extract(self.attestation, ["releases"], dict)
-
- def get_project_links(self) -> dict[str, str] | None:
- """Retrieve the project links from the base metadata.
-
- This method accesses the "info" section of the base metadata to extract the "project_urls" dictionary,
- which contains various links related to the project.
-
- Returns
- -------
- dict[str, str] | None
- Containing project URLs where the keys are the names of the links
- and the values are the corresponding URLs. Returns None if the "project_urls"
- section is not found in the base metadata.
- """
- return json_extract(self.attestation, ["info", "project_urls"], dict)
-
- def get_latest_version(self) -> str | None:
- """Get the latest version of the package.
-
- Returns
- -------
- str | None
- The latest version.
- """
- return json_extract(self.attestation, ["info", "version"], str)
-
- def get_sourcecode_url(self) -> str | None:
- """Get the url of the source distribution.
- Returns
- -------
- str | None
- The URL of the source distribution.
- """
- urls: list | None = json_extract(self.attestation, ["urls"], list)
- if not urls:
- return None
- for distribution in urls:
- if distribution.get("python_version") != "source":
- continue
- source: str = distribution.get("url", "")
- if source:
- return source
- return None
-
- def get_latest_release_upload_time(self) -> str | None:
- """Get upload time of the latest release.
-
- Returns
- -------
- str | None
- The upload time of the latest release.
- """
- urls: list | None = json_extract(self.attestation, ["urls"], list)
- if urls is not None and urls:
- upload_time: str | None = urls[0].get("upload_time")
- return upload_time
- return None
+ return res_obj
- def get_package_page(self) -> str | None:
+ def get_package_page(self, package_name: str) -> str | None:
"""Implement custom API to get package main page.
+ Parameters
+ ----------
+ package_name: str
+ The package name.
+
Returns
-------
str | None
The package main page.
"""
- url = os.path.join(self.base_url, "project", self.package)
+ url = os.path.join(self.registry_url, "project", package_name)
response = send_get_http_raw(url)
if response:
html_snippets = response.content.decode("utf-8")
return html_snippets
return None
- def get_maintainers_of_package(self) -> list | None:
+ def get_maintainers_of_package(self, package_name: str) -> list | None:
"""Implement custom API to get all maintainers of the package.
+ Parameters
+ ----------
+ package_name: str
+ The package name.
+
Returns
-------
list | None
The list of maintainers.
"""
- package_page: str | None = self.get_package_page()
+ package_page: str | None = self.get_package_page(package_name)
if package_page is None:
return None
soup = BeautifulSoup(package_page, "html.parser")
@@ -255,7 +221,7 @@ def get_maintainer_profile_page(self, username: str) -> str | None:
str | None
The profile page.
"""
- url = os.path.join(self.base_url, "user", username)
+ url = os.path.join(self.registry_url, "user", username)
response = send_get_http_raw(url, headers=None)
if response:
html_snippets = response.content.decode("utf-8")
@@ -305,3 +271,146 @@ def get_maintainer_join_date(self, username: str) -> datetime | None:
res: datetime | None = parse_datetime(datetime_val, datetime_format)
return res.replace(tzinfo=None) if res else None
+
+
+@dataclass
+class PyPIPackageJsonAsset:
+ """The package JSON hosted on the PyPI registry."""
+
+ #: The target pypi software component.
+ component: Component
+
+ #: The pypi registry.
+ pypi_registry: PyPIRegistry
+
+ #: The asset content.
+ package_json: dict
+
+ #: The size of the asset (in bytes). This attribute is added to match the AssetLocator
+ #: protocol and is not used because pypi API registry does not provide it.
+ @property
+ def size_in_bytes(self) -> int:
+ """Get the size of asset."""
+ return -1
+
+ @property
+ def name(self) -> str:
+ """Get the asset name."""
+ return "package_json"
+
+ @property
+ def url(self) -> str:
+ """Get the download URL of the asset.
+
+ Note: we assume that the path parameters used to construct the URL are sanitized already.
+
+ Returns
+ -------
+ str
+ """
+ json_endpoint = f"pypi/{self.component.name}/json"
+ return urllib.parse.urljoin(self.pypi_registry.registry_url, json_endpoint)
+
+ def download(self, dest: str) -> bool: # pylint: disable=unused-argument
+ """Download the package JSON metadata and store it in the package_json attribute.
+
+ Returns
+ -------
+ bool
+ ``True`` if the asset is downloaded successfully; ``False`` if not.
+ """
+ try:
+ self.package_json = self.pypi_registry.download_package_json(self.url)
+ return True
+ except InvalidHTTPResponseError as error:
+ logger.debug(error)
+ return False
+
+ def get_releases(self) -> dict | None:
+ """Get all releases.
+
+ Returns
+ -------
+ dict | None
+ Version to metadata.
+ """
+ return json_extract(self.package_json, ["releases"], dict)
+
+ def get_project_links(self) -> dict | None:
+ """Retrieve the project links from the base metadata.
+
+ This method accesses the "info" section of the base metadata to extract the "project_urls" dictionary,
+ which contains various links related to the project.
+
+ Returns
+ -------
+ dict | None
+ Containing project URLs where the keys are the names of the links
+ and the values are the corresponding URLs. Returns None if the "project_urls"
+ section is not found in the base metadata.
+ """
+ return json_extract(self.package_json, ["info", "project_urls"], dict)
+
+ def get_latest_version(self) -> str | None:
+ """Get the latest version of the package.
+
+ Returns
+ -------
+ str | None
+ The latest version.
+ """
+ return json_extract(self.package_json, ["info", "version"], str)
+
+ def get_sourcecode_url(self) -> str | None:
+ """Get the url of the source distribution.
+
+ Returns
+ -------
+ str | None
+ The URL of the source distribution.
+ """
+ urls: list | None = None
+ if self.component.version:
+ urls = json_extract(self.package_json, ["releases", self.component.version], list)
+ else:
+ # Get the latest version.
+ urls = json_extract(self.package_json, ["urls"], list)
+ if not urls:
+ return None
+ for distribution in urls:
+ if distribution.get("packagetype") != "sdist":
+ continue
+ # We intentionally check if the url is None and use empty string if that's the case.
+ source_url: str = distribution.get("url") or ""
+ if source_url:
+ try:
+ parsed_url = urllib.parse.urlparse(source_url)
+ except ValueError:
+ logger.debug("Error occurred while processing the source URL %s.", source_url)
+ return None
+ if self.pypi_registry.fileserver_url_netloc and self.pypi_registry.fileserver_url_scheme:
+ configured_source_url = urllib.parse.ParseResult(
+ scheme=self.pypi_registry.fileserver_url_scheme,
+ netloc=self.pypi_registry.fileserver_url_netloc,
+ path=parsed_url.path,
+ params="",
+ query="",
+ fragment="",
+ ).geturl()
+ logger.debug("Found source URL: %s", configured_source_url)
+ return configured_source_url
+ return None
+
+ def get_latest_release_upload_time(self) -> str | None:
+ """Get upload time of the latest release.
+
+ Returns
+ -------
+ str | None
+ The upload time of the latest release.
+ """
+ urls: list | None = json_extract(self.package_json, ["urls"], list)
+ if urls is not None and urls:
+ upload_time: str | None = urls[0].get("upload_time")
+ return upload_time
+ return None
diff --git a/src/macaron/slsa_analyzer/specs/package_registry_spec.py b/src/macaron/slsa_analyzer/specs/package_registry_spec.py
index 110a02c3e..e28d9c6d8 100644
--- a/src/macaron/slsa_analyzer/specs/package_registry_spec.py
+++ b/src/macaron/slsa_analyzer/specs/package_registry_spec.py
@@ -1,4 +1,4 @@
-# Copyright (c) 2023 - 2023, Oracle and/or its affiliates. All rights reserved.
+# Copyright (c) 2023 - 2024, Oracle and/or its affiliates. All rights reserved.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.
@@ -6,6 +6,7 @@
from dataclasses import dataclass, field
+from macaron.slsa_analyzer.asset import AssetLocator
from macaron.slsa_analyzer.build_tool import BaseBuildTool
from macaron.slsa_analyzer.package_registry import PackageRegistry
from macaron.slsa_analyzer.provenance.provenance import DownloadedProvenanceData
@@ -21,3 +22,5 @@ class PackageRegistryInfo:
package_registry: PackageRegistry
#: The provenances matched against the current repo.
provenances: list[DownloadedProvenanceData] = field(default_factory=list)
+ #: The metadata obtained by the registry.
+ metadata: list[AssetLocator] = field(default_factory=list)
diff --git a/tests/conftest.py b/tests/conftest.py
index 2b8c580bb..894f8db12 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -3,8 +3,7 @@
"""Fixtures for tests."""
from pathlib import Path
-from typing import NoReturn
-from unittest.mock import MagicMock
+from typing import Any, NoReturn
import pytest
@@ -12,11 +11,6 @@
from macaron.code_analyzer.call_graph import BaseNode, CallGraph
from macaron.config.defaults import create_defaults, defaults, load_defaults
from macaron.database.table_definitions import Analysis, Component, Repository
-from macaron.malware_analyzer.pypi_heuristics.metadata.closer_release_join_date import CloserReleaseJoinDateAnalyzer
-from macaron.malware_analyzer.pypi_heuristics.metadata.empty_project_link import EmptyProjectLinkAnalyzer
-from macaron.malware_analyzer.pypi_heuristics.metadata.high_release_frequency import HighReleaseFrequencyAnalyzer
-from macaron.malware_analyzer.pypi_heuristics.metadata.one_release import OneReleaseAnalyzer
-from macaron.malware_analyzer.pypi_heuristics.metadata.unchanged_release import UnchangedReleaseAnalyzer
from macaron.parsers.bashparser import BashScriptType, create_bash_node
from macaron.parsers.github_workflow_model import Identified, Job, NormalJob, RunStep, Workflow
from macaron.slsa_analyzer.analyze_context import AnalyzeContext
@@ -40,7 +34,6 @@
from macaron.slsa_analyzer.ci_service.gitlab_ci import GitLabCI
from macaron.slsa_analyzer.ci_service.jenkins import Jenkins
from macaron.slsa_analyzer.ci_service.travis import Travis
-from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIRegistry
# We need to pass fixture names as arguments to maintain an order.
# pylint: disable=redefined-outer-name
@@ -403,11 +396,20 @@ def get_git_services(
class MockAnalyzeContext(AnalyzeContext):
"""This class initializes a Component for the AnalyzeContext."""
- def __init__(self, *args, **kwargs) -> None: # type: ignore
+ def __init__(
+ self,
+ *args: Any,
+ purl: str | None = None,
+ complete_name: str | None = None,
+ fs_path: str | None = None,
+ **kwargs: Any,
+ ) -> None:
component = Component(
- purl="pkg:github.com/package-url/purl-spec@244fd47e07d1004f0aed9c",
+ purl=purl or "pkg:github.com/package-url/purl-spec@244fd47e07d1004f0aed9c",
analysis=Analysis(),
- repository=Repository(complete_name="github.com/package-url/purl-spec", fs_path=""),
+ repository=Repository(
+ complete_name=complete_name or "github.com/package-url/purl-spec", fs_path=fs_path or ""
+ ),
)
super().__init__(component, *args, **kwargs)
@@ -453,96 +455,3 @@ def build_github_actions_call_graph_for_commands(commands: list[str]) -> CallGra
)
return gh_cg
-
-
-@pytest.fixture(autouse=True)
-def one_release_analyzer() -> dict:
- """Create an one-release-analyzer setup.
-
- Returns
- -------
- dict
- Setup data for the test.
- """
- package_with_one_release = "ttttttttest-nester.py"
- mock_api_client_fail = MagicMock(spec=PyPIRegistry())
- mock_api_client_fail.load_defaults()
- mock_api_client_fail.download_attestation_payload(package=package_with_one_release)
-
- package_with_many_releases = "requests"
- mock_api_client_pass = MagicMock(spec=PyPIRegistry())
- mock_api_client_pass.load_defaults()
- mock_api_client_pass.download_attestation_payload(package=package_with_many_releases)
- analyzer = OneReleaseAnalyzer()
-
- return {
- "package_with_one_release": package_with_one_release,
- "package_with_many_releases": package_with_many_releases,
- "mock_api_client_fail": mock_api_client_fail,
- "mock_api_client_pass": mock_api_client_pass,
- "analyzer": analyzer,
- }
-
-
-@pytest.fixture(autouse=True)
-def setup_empty_project_link_analyzer() -> dict:
- """Create an empty-project-link-analyzer setup.
-
- Returns
- -------
- dict: Setup data for the test.
- """
- package_with_links = "requests"
- mock_api_client_pass = MagicMock(spec=PyPIRegistry)
- mock_api_client_pass.load_defaults()
- mock_api_client_pass.download_attestation_payload(package=package_with_links)
-
- package_no_links = "sfy_hello"
- mock_api_client_fail = MagicMock(spec=PyPIRegistry)
- mock_api_client_fail.load_defaults()
- mock_api_client_fail.download_attestation_payload(package=package_no_links)
- analyzer = EmptyProjectLinkAnalyzer()
- package_links = {
- "Documentation": "https://requests.readthedocs.io",
- "Homepage": "https://requests.readthedocs.io",
- "Source": "https://github.com/psf/requests",
- }
-
- return {
- "package_with_links": package_with_links,
- "package_no_links": package_no_links,
- "mock_api_client_pass": mock_api_client_pass,
- "mock_api_client_fail": mock_api_client_fail,
- "analyzer": analyzer,
- "package_links": package_links,
- }
-
-
-@pytest.fixture(autouse=True)
-def setup_closer_release_join_date_analyzer() -> tuple:
- """Fixture for setting up the CloserReleaseJoinDateAnalyzer and a mock PyPIRegistry client.
-
- Returns
- -------
- tuple:
- A tuple containing the analyzer and the mocked api_client.
- """
- analyzer = CloserReleaseJoinDateAnalyzer()
- api_client = MagicMock(spec=PyPIRegistry)
- return analyzer, api_client
-
-
-@pytest.fixture(autouse=True)
-def setup_high_release_frequency_analyzer() -> tuple:
- """Fixture for setting up the HighReleaseFrequencyAnalyzer and a mock PyPIRegistry client."""
- analyzer = HighReleaseFrequencyAnalyzer()
- api_client = MagicMock(spec=PyPIRegistry)
- return analyzer, api_client
-
-
-@pytest.fixture(autouse=True)
-def setup_unchanged_release_analyzer() -> tuple:
- """Fixture for setting up the UnchangedReleaseAnalyzer and a mock PyPIRegistry client."""
- analyzer = UnchangedReleaseAnalyzer()
- api_client = MagicMock(spec=PyPIRegistry)
- return analyzer, api_client
diff --git a/tests/integration/cases/django_with_dep_resolution_virtual_env_as_input/policy.dl b/tests/integration/cases/django_with_dep_resolution_virtual_env_as_input/policy.dl
index 35c23839a..859b960a2 100644
--- a/tests/integration/cases/django_with_dep_resolution_virtual_env_as_input/policy.dl
+++ b/tests/integration/cases/django_with_dep_resolution_virtual_env_as_input/policy.dl
@@ -10,7 +10,11 @@ Policy("check-dependencies", component_id, "Check the dependencies of django@5.0
match("pkg:pypi/sqlparse@0.*", sqlparse_purl),
transitive_dependency(component_id, asgiref),
is_component(asgiref, asgiref_purl),
- match("pkg:pypi/asgiref@3.*", asgiref_purl).
+ match("pkg:pypi/asgiref@3.*", asgiref_purl),
+ check_passed(component_id, "mcn_detect_malicious_metadata_1"),
+ check_passed(sqlparse, "mcn_detect_malicious_metadata_1"),
+ check_passed(asgiref, "mcn_detect_malicious_metadata_1").
+
apply_policy_to("check-dependencies", component_id) :-
is_component(component_id, "pkg:pypi/django@5.0.6").
diff --git a/tests/malware_analyzer/pypi/conftest.py b/tests/malware_analyzer/pypi/conftest.py
new file mode 100644
index 000000000..a5f775531
--- /dev/null
+++ b/tests/malware_analyzer/pypi/conftest.py
@@ -0,0 +1,27 @@
+# Copyright (c) 2024 - 2024, Oracle and/or its affiliates. All rights reserved.
+# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.
+
+"""This module contains test configurations for malware analyzer."""
+
+from unittest.mock import MagicMock
+
+import pytest
+
+from macaron.database.table_definitions import Analysis, Component
+from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset, PyPIRegistry
+
+
+@pytest.fixture(autouse=True)
+def pypi_package_json() -> MagicMock:
+ """Fixture for setting a mock PyPI package JSON asset object.
+
+ Returns
+ -------
+ MagicMock:
+ A PyPIPackageJsonAsset MagicMock.
+ """
+ pypi_registry = MagicMock(spec=PyPIRegistry)
+ pypi_package = MagicMock(spec=PyPIPackageJsonAsset)
+ pypi_package.pypi_registry = pypi_registry
+ pypi_package.component = Component(purl="pkg:pypi/package", analysis=Analysis(), repository=None)
+ return pypi_package
diff --git a/tests/malware_analyzer/pypi/test_closer_release_join_date.py b/tests/malware_analyzer/pypi/test_closer_release_join_date.py
index e4b32acd6..4ed1a9b24 100644
--- a/tests/malware_analyzer/pypi/test_closer_release_join_date.py
+++ b/tests/malware_analyzer/pypi/test_closer_release_join_date.py
@@ -3,27 +3,23 @@
"""Tests for closer release join date heuristic."""
from datetime import datetime
+from unittest.mock import MagicMock
from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult
+from macaron.malware_analyzer.pypi_heuristics.metadata.closer_release_join_date import CloserReleaseJoinDateAnalyzer
-def test_analyze_pass(setup_closer_release_join_date_analyzer: tuple) -> None:
- """Test analyze method when the heuristic should pass.
-
- Parameters
- ----------
- setup_closer_release_join_date_analyzer: tuple
- The setup fixture returning the analyzer and api_client.
- """
- analyzer, api_client = setup_closer_release_join_date_analyzer
+def test_analyze_pass(pypi_package_json: MagicMock) -> None:
+ """Test analyze method when the heuristic should pass."""
+ analyzer = CloserReleaseJoinDateAnalyzer()
# Set up mock return values.
- api_client.get_maintainers_of_package.return_value = ["maintainer1", "maintainer2"]
- api_client.get_maintainer_join_date.side_effect = [datetime(2018, 1, 1), datetime(2019, 1, 1)]
- api_client.get_latest_release_upload_time.return_value = "2022-06-20T12:00:00"
+ pypi_package_json.pypi_registry.get_maintainers_of_package.return_value = ["maintainer1", "maintainer2"]
+ pypi_package_json.pypi_registry.get_maintainer_join_date.side_effect = [datetime(2018, 1, 1), datetime(2019, 1, 1)]
+ pypi_package_json.get_latest_release_upload_time.return_value = "2022-06-20T12:00:00"
# Call the method.
- result, detail_info = analyzer.analyze(api_client)
+ result, detail_info = analyzer.analyze(pypi_package_json)
# Assert.
assert result == HeuristicResult.PASS
@@ -31,23 +27,17 @@ def test_analyze_pass(setup_closer_release_join_date_analyzer: tuple) -> None:
assert "latest_release_date" in detail_info
-def test_analyze_fail(setup_closer_release_join_date_analyzer: tuple) -> None:
- """Test analyze method when the heuristic should fail.
-
- Parameters
- ----------
- setup_closer_release_join_date_analyzer: tuple
- The setup fixture returning the analyzer and api_client.
- """
- analyzer, api_client = setup_closer_release_join_date_analyzer
+def test_analyze_process(pypi_package_json: MagicMock) -> None:
+ """Test analyze method when the heuristic should fail."""
+ analyzer = CloserReleaseJoinDateAnalyzer()
# Set up mock return values.
- api_client.get_maintainers_of_package.return_value = ["maintainer1"]
- api_client.get_maintainer_join_date.side_effect = [datetime(2022, 6, 18)]
- api_client.get_latest_release_upload_time.return_value = "2022-06-20T12:00:00"
+ pypi_package_json.pypi_registry.get_maintainers_of_package.return_value = ["maintainer1"]
+ pypi_package_json.pypi_registry.get_maintainer_join_date.side_effect = [datetime(2022, 6, 18)]
+ pypi_package_json.get_latest_release_upload_time.return_value = "2022-06-20T12:00:00"
# Call the method.
- result, detail_info = analyzer.analyze(api_client)
+ result, detail_info = analyzer.analyze(pypi_package_json)
# Assert.
assert result == HeuristicResult.FAIL
@@ -55,22 +45,16 @@ def test_analyze_fail(setup_closer_release_join_date_analyzer: tuple) -> None:
assert "latest_release_date" in detail_info
-def test_analyze_skip(setup_closer_release_join_date_analyzer: tuple) -> None:
- """Test analyze method when the heuristic should be skipped.
-
- Parameters
- ----------
- setup_closer_release_join_date_analyzer: tuple
- The setup fixture returning the analyzer and api_client.
- """
- analyzer, api_client = setup_closer_release_join_date_analyzer
+def test_analyze_skip(pypi_package_json: MagicMock) -> None:
+ """Test analyze method when the heuristic should be skipped."""
+ analyzer = CloserReleaseJoinDateAnalyzer()
# Set up mock return values.
- api_client.get_maintainers_of_package.return_value = None
- api_client.get_latest_release_upload_time.return_value = "2022-06-20T12:00:00"
+ pypi_package_json.pypi_registry.get_maintainers_of_package.return_value = None
+ pypi_package_json.get_latest_release_upload_time.return_value = "2022-06-20T12:00:00"
# Call the method.
- result, detail_info = analyzer.analyze(api_client)
+ result, detail_info = analyzer.analyze(pypi_package_json)
# Assert.
assert result == HeuristicResult.SKIP
diff --git a/tests/malware_analyzer/pypi/test_empty_project_link_analyzer.py b/tests/malware_analyzer/pypi/test_empty_project_link_analyzer.py
index 7cc0eeb63..5dad60add 100644
--- a/tests/malware_analyzer/pypi/test_empty_project_link_analyzer.py
+++ b/tests/malware_analyzer/pypi/test_empty_project_link_analyzer.py
@@ -2,58 +2,69 @@
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.
"""Tests for heuristic detecting malicious metadata from PyPI"""
+from unittest.mock import MagicMock
+
+import pytest
+
from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult
+from macaron.malware_analyzer.pypi_heuristics.metadata.empty_project_link import EmptyProjectLinkAnalyzer
+from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset
-def test_analyze_no_links(setup_empty_project_link_analyzer: dict) -> None:
- """Test for result failed.
+@pytest.fixture(name="empty_project_link_analyzer")
+def setup_empty_project_link_analyzer() -> dict:
+ """Create an empty-project-link-analyzer setup.
- Parameters
- ----------
- setup_empty_project_link_analyzer: dict
- The setup fixture returning the analyzer and api_client.
+ Returns
+ -------
+ dict: Setup data for the test.
"""
- mock_api_client_fail = setup_empty_project_link_analyzer["mock_api_client_fail"]
- mock_api_client_fail.get_project_links.return_value = {}
+ mock_pypi_package_pass = MagicMock(spec=PyPIPackageJsonAsset)
+ mock_pypi_package_fail = MagicMock(spec=PyPIPackageJsonAsset)
+ analyzer = EmptyProjectLinkAnalyzer()
+ package_links = {
+ "Documentation": "https://requests.readthedocs.io",
+ "Homepage": "https://requests.readthedocs.io",
+ "Source": "https://github.com/psf/requests",
+ }
+
+ return {
+ "mock_pypi_package_pass": mock_pypi_package_pass,
+ "mock_pypi_package_fail": mock_pypi_package_fail,
+ "analyzer": analyzer,
+ "package_links": package_links,
+ }
+
+
+def test_analyze_no_links(empty_project_link_analyzer: dict) -> None:
+ """Test for result failed."""
+ mock_pypi_package_fail = empty_project_link_analyzer["mock_pypi_package_fail"]
+ mock_pypi_package_fail.get_project_links.return_value = {}
expected_result: tuple[HeuristicResult, dict] = (HeuristicResult.FAIL, {})
- result = setup_empty_project_link_analyzer["analyzer"].analyze(mock_api_client_fail)
+ result = empty_project_link_analyzer["analyzer"].analyze(mock_pypi_package_fail)
assert result == expected_result
-def test_analyze_with_links(setup_empty_project_link_analyzer: dict) -> None:
- """Test for result passed.
-
- Parameters
- ----------
- setup_empty_project_link_analyzer: dict
- The setup fixture returning the analyzer and api_client.
-
- """
- package_links = setup_empty_project_link_analyzer["package_links"]
- mock_api_client_pass = setup_empty_project_link_analyzer["mock_api_client_pass"]
- mock_api_client_pass.get_project_links.return_value = package_links
+def test_analyze_with_links(empty_project_link_analyzer: dict) -> None:
+ """Test for result passed."""
+ package_links = empty_project_link_analyzer["package_links"]
+ mock_pypi_package_pass = empty_project_link_analyzer["mock_pypi_package_pass"]
+ mock_pypi_package_pass.get_project_links.return_value = package_links
expected_result: tuple[HeuristicResult, dict] = (HeuristicResult.PASS, {"project_links": package_links})
- result = setup_empty_project_link_analyzer["analyzer"].analyze(mock_api_client_pass)
+ result = empty_project_link_analyzer["analyzer"].analyze(mock_pypi_package_pass)
assert result == expected_result
-def test_analyze_none(setup_empty_project_link_analyzer: dict) -> None:
- """Test for result skip.
-
- Parameters
- ----------
- setup_empty_project_link_analyzer: dict
- The setup fixture returning the analyzer and api_client.
-
- """
- mock_api_client_pass = setup_empty_project_link_analyzer["mock_api_client_pass"]
- mock_api_client_pass.get_project_links.return_value = None
- expected_result: tuple[HeuristicResult, dict] = (HeuristicResult.SKIP, {})
+def test_analyze_none(empty_project_link_analyzer: dict) -> None:
+ """Test for result skip."""
+ mock_pypi_package_pass = empty_project_link_analyzer["mock_pypi_package_pass"]
+ mock_pypi_package_pass.get_project_links.return_value = None
+ expected_result: tuple[HeuristicResult, dict] = (HeuristicResult.FAIL, {})
- result = setup_empty_project_link_analyzer["analyzer"].analyze(mock_api_client_pass)
+ result = empty_project_link_analyzer["analyzer"].analyze(mock_pypi_package_pass)
assert result == expected_result
diff --git a/tests/malware_analyzer/pypi/test_high_release_frequency.py b/tests/malware_analyzer/pypi/test_high_release_frequency.py
index e9df944f1..9cd82b570 100644
--- a/tests/malware_analyzer/pypi/test_high_release_frequency.py
+++ b/tests/malware_analyzer/pypi/test_high_release_frequency.py
@@ -3,96 +3,99 @@
"""Tests for high release frequency heuristic."""
+from unittest.mock import MagicMock
+
from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult
+from macaron.malware_analyzer.pypi_heuristics.metadata.high_release_frequency import HighReleaseFrequencyAnalyzer
-def test_analyze_high_frequency_pass(setup_high_release_frequency_analyzer: tuple) -> None:
+def test_analyze_high_frequency_pass(pypi_package_json: MagicMock) -> None:
"""Test HighReleaseFrequencyAnalyzer with low release frequency (should pass).
Parameters
----------
- setup_high_release_frequency_analyzer: tuple
- The setup fixture returning the analyzer and api_client.
+ pypi_package_json: MagicMock
+ The PyPIPackageJsonAsset MagicMock fixture.
"""
- analyzer, api_client = setup_high_release_frequency_analyzer
+ analyzer = HighReleaseFrequencyAnalyzer()
# Mock return values.
- api_client.get_releases.return_value = {
+ pypi_package_json.get_releases.return_value = {
"0.1": [{"upload_time": "2022-01-01T12:00:00"}],
"0.2": [{"upload_time": "2022-01-10T12:00:00"}],
"0.3": [{"upload_time": "2022-01-20T12:00:00"}],
}
# Call the method.
- result, detail_info = analyzer.analyze(api_client)
+ result, detail_info = analyzer.analyze(pypi_package_json)
# Assert.
assert result == HeuristicResult.PASS
assert detail_info == {"frequency": 9}
-def test_analyze_low_frequency_fail(setup_high_release_frequency_analyzer: tuple) -> None:
+def test_analyze_low_frequency_fail(pypi_package_json: MagicMock) -> None:
"""Test HighReleaseFrequencyAnalyzer with high release frequency (should fail).
Parameters
----------
- setup_high_release_frequency_analyzer: tuple
- The setup fixture returning the analyzer and api_client.
+ pypi_package_json: MagicMock
+ The PyPIPackageJsonAsset MagicMock fixture.
"""
- analyzer, api_client = setup_high_release_frequency_analyzer
+ analyzer = HighReleaseFrequencyAnalyzer()
# Mock return values.
- api_client.get_releases.return_value = {
+ pypi_package_json.get_releases.return_value = {
"0.1": [{"upload_time": "2022-01-01T12:00:00"}],
"0.2": [{"upload_time": "2022-01-02T12:00:00"}],
"0.3": [{"upload_time": "2022-01-04T12:00:00"}],
}
# Call the method.
- result, detail_info = analyzer.analyze(api_client)
+ result, detail_info = analyzer.analyze(pypi_package_json)
# Assert.
assert result == HeuristicResult.FAIL
assert detail_info == {"frequency": 1}
-def test_analyze_no_releases_skip(setup_high_release_frequency_analyzer: tuple) -> None:
+def test_analyze_no_releases_skip(pypi_package_json: MagicMock) -> None:
"""Test HighReleaseFrequencyAnalyzer when no releases are available (should skip).
Parameters
----------
- setup_high_release_frequency_analyzer: tuple
- The setup fixture returning the analyzer and api_client.
+ pypi_package_json: MagicMock
+ The PyPIPackageJsonAsset MagicMock fixture.
"""
- analyzer, api_client = setup_high_release_frequency_analyzer
+ analyzer = HighReleaseFrequencyAnalyzer()
# Mock return values.
- api_client.get_releases.return_value = None
+ pypi_package_json.get_releases.return_value = None
# Call the method.
- result, detail_info = analyzer.analyze(api_client)
+ result, detail_info = analyzer.analyze(pypi_package_json)
# Assert.
assert result == HeuristicResult.SKIP
- assert detail_info == {}
+ assert not detail_info
-def test_analyze_single_release_skip(setup_high_release_frequency_analyzer: tuple) -> None:
+def test_analyze_single_release_skip(pypi_package_json: MagicMock) -> None:
"""Test HighReleaseFrequencyAnalyzer with a single release (should skip).
Parameters
----------
- setup_high_release_frequency_analyzer: tuple
- The setup fixture returning the analyzer and api_client.
+ pypi_package_json: MagicMock
+ The PyPIPackageJsonAsset MagicMock fixture.
"""
- analyzer, api_client = setup_high_release_frequency_analyzer
+ analyzer = HighReleaseFrequencyAnalyzer()
# Mock return values.
- api_client.get_releases.return_value = {"0.1": [{"upload_time": "2022-01-01T12:00:00"}]}
+ pypi_package_json.get_releases.return_value = {"0.1": [{"upload_time": "2022-01-01T12:00:00"}]}
# Call the method.
- result, detail_info = analyzer.analyze(api_client)
+ result, detail_info = analyzer.analyze(pypi_package_json)
# Assert.
assert result == HeuristicResult.SKIP
- assert detail_info == {}
+ assert not detail_info
diff --git a/tests/malware_analyzer/pypi/test_one_release_analyzer.py b/tests/malware_analyzer/pypi/test_one_release_analyzer.py
index 6807f24d1..60ad244ab 100644
--- a/tests/malware_analyzer/pypi/test_one_release_analyzer.py
+++ b/tests/malware_analyzer/pypi/test_one_release_analyzer.py
@@ -2,35 +2,48 @@
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.
"""Tests for heuristic detecting malicious metadata from PyPI"""
+from unittest.mock import MagicMock
+
+import pytest
+
from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult
+from macaron.malware_analyzer.pypi_heuristics.metadata.one_release import OneReleaseAnalyzer
+from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset
-def test_analyze_no_releases(one_release_analyzer: dict) -> None:
- """Test for result skipped.
+@pytest.fixture(name="one_release_analyzer")
+def setup_one_release_analyzer() -> dict:
+ """Create an one-release-analyzer setup.
- Parameters
- ----------
- one_release_analyzer: dict
- The setup fixture returning the analyzer and api_client.
+ Returns
+ -------
+ dict
+ Setup data for the test.
"""
- mock_api_client_pass = one_release_analyzer["mock_api_client_pass"]
- mock_api_client_pass.get_releases.return_value = None
+ mock_pypi_package_pass = MagicMock(spec=PyPIPackageJsonAsset)
+ mock_pypi_package_fail = MagicMock(spec=PyPIPackageJsonAsset)
+ analyzer = OneReleaseAnalyzer()
+
+ return {
+ "mock_pypi_package_fail": mock_pypi_package_fail,
+ "mock_pypi_package_pass": mock_pypi_package_pass,
+ "analyzer": analyzer,
+ }
+
+
+def test_analyze_no_releases(one_release_analyzer: dict) -> None:
+ """Test for result skipped."""
+ mock_pypi_package_pass = one_release_analyzer["mock_pypi_package_pass"]
+ mock_pypi_package_pass.get_releases.return_value = None
expected_result: tuple[HeuristicResult, dict] = (HeuristicResult.SKIP, {"releases": {}})
- result = one_release_analyzer["analyzer"].analyze(mock_api_client_pass)
+ result = one_release_analyzer["analyzer"].analyze(mock_pypi_package_pass)
assert result == expected_result
def test_analyze_one_release(one_release_analyzer: dict) -> None:
- """Test for result failed.
-
- Parameters
- ----------
- one_release_analyzer: dict
- The setup fixture returning the analyzer and api_client.
-
- """
+ """Test for result failed."""
release = {
"0.1.0": [
{
@@ -57,23 +70,17 @@ def test_analyze_one_release(one_release_analyzer: dict) -> None:
}
]
}
- mock_api_client_fail = one_release_analyzer["mock_api_client_fail"]
- mock_api_client_fail.get_releases.return_value = release
+ mock_pypi_package_fail = one_release_analyzer["mock_pypi_package_fail"]
+ mock_pypi_package_fail.get_releases.return_value = release
expected_result: tuple[HeuristicResult, dict] = (HeuristicResult.FAIL, {"releases": release})
- result = one_release_analyzer["analyzer"].analyze(mock_api_client_fail)
+ result = one_release_analyzer["analyzer"].analyze(mock_pypi_package_fail)
assert result == expected_result
def test_analyze_multiple_releases(one_release_analyzer: dict) -> None:
- """Test for result passed.
-
- Parameters
- ----------
- one_release_analyzer: dict
- The setup fixture returning the analyzer and api_client.
- """
+ """Test for result passed."""
releases = {
"0.0.1": [],
"0.10.0": [
@@ -101,10 +108,9 @@ def test_analyze_multiple_releases(one_release_analyzer: dict) -> None:
}
],
}
- mock_api_client_pass = one_release_analyzer["mock_api_client_pass"]
- mock_api_client_pass.get_releases.return_value = releases
+ mock_pypi_package_pass = one_release_analyzer["mock_pypi_package_pass"]
+ mock_pypi_package_pass.get_releases.return_value = releases
expected_result: tuple[HeuristicResult, dict] = (HeuristicResult.PASS, {"releases": releases})
- result = one_release_analyzer["analyzer"].analyze(mock_api_client_pass)
-
+ result = one_release_analyzer["analyzer"].analyze(mock_pypi_package_pass)
assert result == expected_result
diff --git a/tests/malware_analyzer/pypi/test_suspicious_setup.py b/tests/malware_analyzer/pypi/test_suspicious_setup.py
index 934094887..ec9af0f0f 100644
--- a/tests/malware_analyzer/pypi/test_suspicious_setup.py
+++ b/tests/malware_analyzer/pypi/test_suspicious_setup.py
@@ -8,7 +8,7 @@
from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult
from macaron.malware_analyzer.pypi_heuristics.sourcecode.suspicious_setup import SuspiciousSetupAnalyzer
-from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIRegistry
+from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset
def test_analyze_skip() -> None:
@@ -16,12 +16,12 @@ def test_analyze_skip() -> None:
The heuristic analyzer should return SKIP if the URL is not present.
"""
- mock_pypi_registry = MagicMock(spec=PyPIRegistry)
- mock_pypi_registry.get_sourcecode_url.return_value = None
+ mock_pypi_package = MagicMock(spec=PyPIPackageJsonAsset)
+ mock_pypi_package.get_sourcecode_url.return_value = None
analyzer = SuspiciousSetupAnalyzer()
- result, data = analyzer.analyze(mock_pypi_registry)
+ result, data = analyzer.analyze(mock_pypi_package)
assert result == HeuristicResult.SKIP
assert not data
@@ -31,16 +31,17 @@ def test_analyze_fail() -> None:
The heuristic analyzer should return FAIL if the suspicious import is found.
"""
- mock_pypi_registry = MagicMock(spec=PyPIRegistry)
- mock_pypi_registry.get_sourcecode_url.return_value = "http://example.com/sourcecode.tar.gz"
+ mock_pypi_package = MagicMock(spec=PyPIPackageJsonAsset)
+ mock_pypi_package.get_sourcecode_url.return_value = "http://example.com/sourcecode.tar.gz"
analyzer = SuspiciousSetupAnalyzer()
with patch.object(analyzer, "_get_setup_source_code", return_value="import base64\n"):
- result, data = analyzer.analyze(mock_pypi_registry)
+ result, data = analyzer.analyze(mock_pypi_package)
# Assert that the result is FAIL and the data contains the imported module.
assert result == HeuristicResult.FAIL
+ assert isinstance(data["import_module"], list)
assert "base64" in data["import_module"]
@@ -49,16 +50,17 @@ def test_analyze_no_suspicious_import() -> None:
The heuristic analyzer should return PASS if no suspicious imports are found.
"""
- mock_pypi_registry = MagicMock(spec=PyPIRegistry)
- mock_pypi_registry.get_sourcecode_url.return_value = "http://example.com/sourcecode.tar.gz"
+ mock_pypi_package = MagicMock(spec=PyPIPackageJsonAsset)
+ mock_pypi_package.get_sourcecode_url.return_value = "http://example.com/sourcecode.tar.gz"
analyzer = SuspiciousSetupAnalyzer()
with patch.object(analyzer, "_get_setup_source_code", return_value="import random\n"):
- result, data = analyzer.analyze(mock_pypi_registry)
+ result, data = analyzer.analyze(mock_pypi_package)
# Assert that the result is PASS and the data contains the imported module.
assert result == HeuristicResult.PASS
+ assert isinstance(data["import_module"], list)
assert "random" in data["import_module"]
diff --git a/tests/malware_analyzer/pypi/test_unchanged_release.py b/tests/malware_analyzer/pypi/test_unchanged_release.py
index 227f3955d..f1162aaea 100644
--- a/tests/malware_analyzer/pypi/test_unchanged_release.py
+++ b/tests/malware_analyzer/pypi/test_unchanged_release.py
@@ -2,75 +2,78 @@
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.
"""Tests for heuristic detecting malicious metadata from PyPI"""
+from unittest.mock import MagicMock
+
from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult
+from macaron.malware_analyzer.pypi_heuristics.metadata.unchanged_release import UnchangedReleaseAnalyzer
-def test_analyze_pass(setup_unchanged_release_analyzer: tuple) -> None:
+def test_analyze_pass(pypi_package_json: MagicMock) -> None:
"""Test the analyze method returning PASS.
Parameters
----------
- setup_unchanged_release_analyzer: tuple
- Fixture for UnchangedReleaseAnalyzer and mocked PyPIRegistry.
+ pypi_package_json: MagicMock
+ The PyPIPackageJsonAsset MagicMock fixture.
"""
- analyzer, api_client = setup_unchanged_release_analyzer
+ analyzer = UnchangedReleaseAnalyzer()
# Set up mock return values.
- api_client.get_releases.return_value = {
+ pypi_package_json.get_releases.return_value = {
"v1.0": [{"digests": {"sha256": "digest1"}}],
"v1.1": [{"digests": {"sha256": "digest2"}}],
"v1.2": [{"digests": {"sha256": "digest3"}}],
}
# Call the method.
- result, detail_info = analyzer.analyze(api_client)
+ result, detail_info = analyzer.analyze(pypi_package_json)
# Assert.
assert result == HeuristicResult.PASS
- assert detail_info == {}
+ assert not detail_info
-def test_analyze_fail(setup_unchanged_release_analyzer: tuple) -> None:
+def test_analyze_fail(pypi_package_json: MagicMock) -> None:
"""Test the analyze method returning FAIL.
Parameters
----------
- setup_unchanged_release_analyzer: tuple
- Fixture for UnchangedReleaseAnalyzer and mocked PyPIRegistry.
+ pypi_package_json: MagicMock
+ The PyPIPackageJsonAsset MagicMock fixture.
"""
- analyzer, api_client = setup_unchanged_release_analyzer
+ analyzer = UnchangedReleaseAnalyzer()
# Set up mock return values.
- api_client.get_releases.return_value = {
+ pypi_package_json.get_releases.return_value = {
"v1.0": [{"digests": {"sha256": "digest1"}}],
"v1.1": [{"digests": {"sha256": "digest1"}}], # Duplicate digest.
"v1.2": [{"digests": {"sha256": "digest2"}}],
}
# Call the method.
- result, detail_info = analyzer.analyze(api_client)
+ result, detail_info = analyzer.analyze(pypi_package_json)
# Assert.
assert result == HeuristicResult.FAIL
- assert detail_info == {}
+ assert not detail_info
-def test_analyze_skip(setup_unchanged_release_analyzer: tuple) -> None:
+def test_analyze_skip(pypi_package_json: MagicMock) -> None:
"""Test the analyze method returning SKIP.
Parameters
----------
- setup_unchanged_release_analyzer: tuple
- Fixture for UnchangedReleaseAnalyzer and mocked PyPIRegistry.
+ pypi_package_json: MagicMock
+ The PyPIPackageJsonAsset MagicMock fixture.
"""
- analyzer, api_client = setup_unchanged_release_analyzer
+ analyzer = UnchangedReleaseAnalyzer()
# Set up mock return values.
- api_client.get_releases.return_value = None
+ pypi_package_json.get_releases.return_value = None
# Call the method.
- result, detail_info = analyzer.analyze(api_client)
+ result, detail_info = analyzer.analyze(pypi_package_json)
# Assert.
assert result == HeuristicResult.SKIP
- assert detail_info == {}
+ assert not detail_info
diff --git a/tests/malware_analyzer/pypi/test_unreachable_project_links_analyzer.py b/tests/malware_analyzer/pypi/test_unreachable_project_links_analyzer.py
index f69e88c21..410fe925e 100644
--- a/tests/malware_analyzer/pypi/test_unreachable_project_links_analyzer.py
+++ b/tests/malware_analyzer/pypi/test_unreachable_project_links_analyzer.py
@@ -7,22 +7,21 @@
from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult
from macaron.malware_analyzer.pypi_heuristics.metadata.unreachable_project_links import UnreachableProjectLinksAnalyzer
-from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIRegistry
+from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset
@patch("requests.head")
def test_analyze_with_all_reachable_links(mock_head: Mock) -> None:
"""Test for all links are reachable"""
# Setup.
- package = "requests"
- mock_api_client = MagicMock(spec=PyPIRegistry(package))
+ mock_pypi_package = MagicMock(spec=PyPIPackageJsonAsset)
project_links: dict = {
"Documentation": "https://requests.readthedocs.io",
"Homepage": "https://requests.readthedocs.io",
"Source": "https://github.com/psf/requests",
}
- mock_api_client.get_project_links.return_value = project_links
+ mock_pypi_package.get_project_links.return_value = project_links
expected_result: tuple[HeuristicResult, dict] = (HeuristicResult.PASS, {})
mock_response = MagicMock()
@@ -31,7 +30,7 @@ def test_analyze_with_all_reachable_links(mock_head: Mock) -> None:
analyzer = UnreachableProjectLinksAnalyzer()
# Execute.
- result: tuple[HeuristicResult, dict] = analyzer.analyze(api_client=mock_api_client)
+ result: tuple[HeuristicResult, dict] = analyzer.analyze(mock_pypi_package)
# Verify.
assert result == expected_result
@@ -41,10 +40,9 @@ def test_analyze_with_all_reachable_links(mock_head: Mock) -> None:
def test_analyze_with_all_unreachable_links(mock_head: Mock) -> None:
"""Test for all project links are unreachable"""
# Setup.
- package = "requests5"
project_links: dict = {"Homepage": "https://github.com/jiangfubang/fast_requests"}
- mock_api_client = MagicMock(spec=PyPIRegistry(package))
- mock_api_client.get_project_links.return_value = project_links
+ mock_pypi_package = MagicMock(spec=PyPIPackageJsonAsset)
+ mock_pypi_package.get_project_links.return_value = project_links
expected_result: tuple[HeuristicResult, dict] = (HeuristicResult.FAIL, {})
analyzer = UnreachableProjectLinksAnalyzer()
@@ -53,7 +51,7 @@ def test_analyze_with_all_unreachable_links(mock_head: Mock) -> None:
mock_head.return_value = mock_response
# Execute.
- result: tuple[HeuristicResult, dict] = analyzer.analyze(api_client=mock_api_client)
+ result: tuple[HeuristicResult, dict] = analyzer.analyze(mock_pypi_package)
# Verify.
assert result == expected_result
@@ -63,14 +61,13 @@ def test_analyze_with_no_project_links() -> None:
"""Test for the metadata missing"""
# TODO Package with missing metadata is not available now
# Setup.
- package = "example" * 5 # Not a package on PyPI.
- mock_api_client = MagicMock(spec=PyPIRegistry(package))
- mock_api_client.get_project_links.return_value = None
+ mock_pypi_package = MagicMock(spec=PyPIPackageJsonAsset)
+ mock_pypi_package.get_project_links.return_value = None
analyzer = UnreachableProjectLinksAnalyzer()
expected_result: tuple[HeuristicResult, dict] = (HeuristicResult.SKIP, {})
# Execute.
- result: tuple[HeuristicResult, dict] = analyzer.analyze(api_client=mock_api_client)
+ result: tuple[HeuristicResult, dict] = analyzer.analyze(mock_pypi_package)
# Verify.
assert result == expected_result
@@ -80,14 +77,13 @@ def test_analyze_with_no_project_links() -> None:
def test_analyze_with_mixed_links(mock_head: Mock) -> None:
"""Test for the situation when the links are partially accessible"""
# Setup.
- package = "requests"
project_links: dict = {
"Documentation": "https://requests.readthedocs.io",
"Homepage": "https://requests.readthedocs.io",
"Source": "https://badurl.com",
}
- mock_api_client = MagicMock(spec=PyPIRegistry(package))
- mock_api_client.get_project_links.return_value = project_links
+ mock_pypi_package = MagicMock(spec=PyPIPackageJsonAsset)
+ mock_pypi_package.get_project_links.return_value = project_links
expected_result: tuple[HeuristicResult, dict] = (HeuristicResult.PASS, {})
# Mock responses for each URL.
@@ -103,7 +99,7 @@ def side_effect(url: str, *args: tuple, **kwargs: dict) -> Mock: # pylint: disa
analyzer = UnreachableProjectLinksAnalyzer()
# Execute.
- result: tuple[HeuristicResult, dict] = analyzer.analyze(api_client=mock_api_client)
+ result: tuple[HeuristicResult, dict] = analyzer.analyze(mock_pypi_package)
# Verify.
assert result == expected_result
diff --git a/tests/slsa_analyzer/checks/resources/pypi_files/zlibxjson-8.2.source b/tests/slsa_analyzer/checks/resources/pypi_files/zlibxjson-8.2.source
new file mode 100644
index 000000000..421b1765b
Binary files /dev/null and b/tests/slsa_analyzer/checks/resources/pypi_files/zlibxjson-8.2.source differ
diff --git a/tests/slsa_analyzer/checks/resources/pypi_files/zlibxjson.html b/tests/slsa_analyzer/checks/resources/pypi_files/zlibxjson.html
new file mode 100644
index 000000000..351772a82
--- /dev/null
+++ b/tests/slsa_analyzer/checks/resources/pypi_files/zlibxjson.html
@@ -0,0 +1,968 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ zlibxjson · PyPI
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ Skip to main content
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
Project description
+
+
zlibxjson
+
zlibxjson est un package
+
+
+
+
+
+
Project details
+
+
+
+
+
+
+
+
+
Download files
+
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
+
+Source Distribution
+
+
+
+
+
+Built Distribution
+
+
+
+
+
+
+
+
+
+ Close
+
+
+
+Hashes for zlibxjson-8.2.tar.gz
+
+ Hashes for zlibxjson-8.2.tar.gz
+
+
+ Algorithm |
+ Hash digest |
+ |
+
+
+
+
+ SHA256 |
+ ffd429805b115400d4ccf550e2d480863ab47891ea0c76f616823f8219ebdce0 |
+
+
+ |
+
+
+ MD5 |
+ 7cca7668ed361e231be32f9a5799c4d5 |
+
+
+ |
+
+
+ BLAKE2b-256 |
+ 3e1eb1ecb05e7ca1eb74ca6257a7f43d052b90d2ac01feb28eb28ce677a871ab |
+
+
+ |
+
+
+
+
+
+
+
+
+
+
+ Close
+
+
+
+Hashes for zlibxjson-8.2-py3-none-any.whl
+
+ Hashes for zlibxjson-8.2-py3-none-any.whl
+
+
+ Algorithm |
+ Hash digest |
+ |
+
+
+
+
+ SHA256 |
+ a46e553386f6ecd2e1429a77dadbf8074aa31ad7f5ab502342af6c06162b37d4 |
+
+
+ |
+
+
+ MD5 |
+ b503f2615105f8e1e65e774ce8e741ab |
+
+
+ |
+
+
+ BLAKE2b-256 |
+ 55b33a43f065f6199d519ebbb48f3a94c4f0557beb34bbed48c1ba89c67b1959 |
+
+
+ |
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/tests/slsa_analyzer/checks/resources/pypi_files/zlibxjson_package.json b/tests/slsa_analyzer/checks/resources/pypi_files/zlibxjson_package.json
new file mode 100644
index 000000000..32f84fbcd
--- /dev/null
+++ b/tests/slsa_analyzer/checks/resources/pypi_files/zlibxjson_package.json
@@ -0,0 +1 @@
+{"info":{"author":"Votre Nom","author_email":"votre.email@example.com","bugtrack_url":null,"classifiers":["License :: OSI Approved :: MIT License","Operating System :: OS Independent","Programming Language :: Python :: 3"],"description":"# zlibxjson\r\n\r\nzlibxjson est un package \r\n","description_content_type":"text/markdown","docs_url":null,"download_url":null,"downloads":{"last_day":-1,"last_month":-1,"last_week":-1},"dynamic":null,"home_page":"https://github.com/votre_nom/votre_repository","keywords":null,"license":null,"maintainer":null,"maintainer_email":null,"name":"zlibxjson","package_url":"https://pypi.org/project/zlibxjson/","platform":null,"project_url":"https://pypi.org/project/zlibxjson/","project_urls":{"Homepage":"https://github.com/votre_nom/votre_repository"},"provides_extra":null,"release_url":"https://pypi.org/project/zlibxjson/8.2/","requires_dist":["requests"],"requires_python":">=3.6","summary":"Package","version":"8.2","yanked":false,"yanked_reason":null},"last_serial":23910021,"releases":{"8.1":[{"comment_text":"","digests":{"blake2b_256":"c2f8fce576879546cdb56b18dc1a780d6caa5155070eef7b04579c19309d3454","md5":"bd0b0422d881fb82b4e07f1e5b1da403","sha256":"cefa8dc4947eba0e90c46a6373d9a5fb95d003e06c9bfeeb2cbe59dc40b57f7a"},"downloads":-1,"filename":"zlibxjson-8.1-py3-none-any.whl","has_sig":false,"md5_digest":"bd0b0422d881fb82b4e07f1e5b1da403","packagetype":"bdist_wheel","python_version":"py3","requires_python":">=3.6","size":2441,"upload_time":"2024-06-29T20:22:51","upload_time_iso_8601":"2024-06-29T20:22:51.355930Z","url":"https://files.pythonhosted.org/packages/c2/f8/fce576879546cdb56b18dc1a780d6caa5155070eef7b04579c19309d3454/zlibxjson-8.1-py3-none-any.whl","yanked":false,"yanked_reason":null},{"comment_text":"","digests":{"blake2b_256":"26e234e13483deb6dd1dba1531eee8006942a9727e388971d2807a9785f07179","md5":"39c4eabbf1c3898e2ea8ad466100447c","sha256":"abda4c7287329e8bd097cda125d493ed0313a40043a7a18871dae8897f879d81"},"downloads":-1,"filename":"zlibxjson-8.1.tar.gz","has_sig":false,"md5_digest":"39c4eabbf1c3898e2ea8ad466100447c","packagetype":"sdist","python_version":"source","requires_python":">=3.6","size":1984,"upload_time":"2024-06-29T20:22:52","upload_time_iso_8601":"2024-06-29T20:22:52.730363Z","url":"https://files.pythonhosted.org/packages/26/e2/34e13483deb6dd1dba1531eee8006942a9727e388971d2807a9785f07179/zlibxjson-8.1.tar.gz","yanked":false,"yanked_reason":null}],"8.2":[{"comment_text":"","digests":{"blake2b_256":"55b33a43f065f6199d519ebbb48f3a94c4f0557beb34bbed48c1ba89c67b1959","md5":"b503f2615105f8e1e65e774ce8e741ab","sha256":"a46e553386f6ecd2e1429a77dadbf8074aa31ad7f5ab502342af6c06162b37d4"},"downloads":-1,"filename":"zlibxjson-8.2-py3-none-any.whl","has_sig":false,"md5_digest":"b503f2615105f8e1e65e774ce8e741ab","packagetype":"bdist_wheel","python_version":"py3","requires_python":">=3.6","size":2442,"upload_time":"2024-06-29T20:46:26","upload_time_iso_8601":"2024-06-29T20:46:26.626290Z","url":"https://files.pythonhosted.org/packages/55/b3/3a43f065f6199d519ebbb48f3a94c4f0557beb34bbed48c1ba89c67b1959/zlibxjson-8.2-py3-none-any.whl","yanked":false,"yanked_reason":null},{"comment_text":"","digests":{"blake2b_256":"3e1eb1ecb05e7ca1eb74ca6257a7f43d052b90d2ac01feb28eb28ce677a871ab","md5":"7cca7668ed361e231be32f9a5799c4d5","sha256":"ffd429805b115400d4ccf550e2d480863ab47891ea0c76f616823f8219ebdce0"},"downloads":-1,"filename":"zlibxjson-8.2.tar.gz","has_sig":false,"md5_digest":"7cca7668ed361e231be32f9a5799c4d5","packagetype":"sdist","python_version":"source","requires_python":">=3.6","size":1993,"upload_time":"2024-06-29T20:46:28","upload_time_iso_8601":"2024-06-29T20:46:28.770593Z","url":"https://files.pythonhosted.org/packages/3e/1e/b1ecb05e7ca1eb74ca6257a7f43d052b90d2ac01feb28eb28ce677a871ab/zlibxjson-8.2.tar.gz","yanked":false,"yanked_reason":null}]},"urls":[{"comment_text":"","digests":{"blake2b_256":"55b33a43f065f6199d519ebbb48f3a94c4f0557beb34bbed48c1ba89c67b1959","md5":"b503f2615105f8e1e65e774ce8e741ab","sha256":"a46e553386f6ecd2e1429a77dadbf8074aa31ad7f5ab502342af6c06162b37d4"},"downloads":-1,"filename":"zlibxjson-8.2-py3-none-any.whl","has_sig":false,"md5_digest":"b503f2615105f8e1e65e774ce8e741ab","packagetype":"bdist_wheel","python_version":"py3","requires_python":">=3.6","size":2442,"upload_time":"2024-06-29T20:46:26","upload_time_iso_8601":"2024-06-29T20:46:26.626290Z","url":"https://files.pythonhosted.org/packages/55/b3/3a43f065f6199d519ebbb48f3a94c4f0557beb34bbed48c1ba89c67b1959/zlibxjson-8.2-py3-none-any.whl","yanked":false,"yanked_reason":null},{"comment_text":"","digests":{"blake2b_256":"3e1eb1ecb05e7ca1eb74ca6257a7f43d052b90d2ac01feb28eb28ce677a871ab","md5":"7cca7668ed361e231be32f9a5799c4d5","sha256":"ffd429805b115400d4ccf550e2d480863ab47891ea0c76f616823f8219ebdce0"},"downloads":-1,"filename":"zlibxjson-8.2.tar.gz","has_sig":false,"md5_digest":"7cca7668ed361e231be32f9a5799c4d5","packagetype":"sdist","python_version":"source","requires_python":">=3.6","size":1993,"upload_time":"2024-06-29T20:46:28","upload_time_iso_8601":"2024-06-29T20:46:28.770593Z","url":"https://files.pythonhosted.org/packages/3e/1e/b1ecb05e7ca1eb74ca6257a7f43d052b90d2ac01feb28eb28ce677a871ab/zlibxjson-8.2.tar.gz","yanked":false,"yanked_reason":null}],"vulnerabilities":[]}
diff --git a/tests/slsa_analyzer/checks/resources/pypi_files/zlibxjson_user.html b/tests/slsa_analyzer/checks/resources/pypi_files/zlibxjson_user.html
new file mode 100644
index 000000000..a74461932
--- /dev/null
+++ b/tests/slsa_analyzer/checks/resources/pypi_files/zlibxjson_user.html
@@ -0,0 +1,446 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ Profile of tser111111 · PyPI
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ Skip to main content
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/tests/slsa_analyzer/checks/test_detect_malicious_metadata_check.py b/tests/slsa_analyzer/checks/test_detect_malicious_metadata_check.py
new file mode 100644
index 000000000..45786aa78
--- /dev/null
+++ b/tests/slsa_analyzer/checks/test_detect_malicious_metadata_check.py
@@ -0,0 +1,82 @@
+# Copyright (c) 2024 - 2024, Oracle and/or its affiliates. All rights reserved.
+# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.
+
+"""Module to test the malicious metadata detection check."""
+
+import json
+import os
+import urllib.parse
+from pathlib import Path
+
+import pytest
+from pytest_httpserver import HTTPServer
+
+from macaron.config.defaults import load_defaults
+from macaron.slsa_analyzer.build_tool.base_build_tool import BaseBuildTool
+from macaron.slsa_analyzer.checks.check_result import CheckResultType
+from macaron.slsa_analyzer.checks.detect_malicious_metadata_check import DetectMaliciousMetadataCheck
+from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIRegistry
+from macaron.slsa_analyzer.specs.package_registry_spec import PackageRegistryInfo
+from tests.conftest import MockAnalyzeContext
+
+RESOURCE_PATH = Path(__file__).parent.joinpath("resources")
+
+
+@pytest.mark.parametrize(
+ ("purl", "expected"),
+ [
+ ("pkg:pypi/zlibxjson", CheckResultType.FAILED),
+ ("pkg:pypi/test", CheckResultType.UNKNOWN),
+ ("pkg:maven:test/test", CheckResultType.UNKNOWN),
+ ],
+)
+def test_detect_malicious_metadata(
+ httpserver: HTTPServer, tmp_path: Path, pip_tool: BaseBuildTool, macaron_path: Path, purl: str, expected: str
+) -> None:
+ """Test that the check handles repositories correctly."""
+ check = DetectMaliciousMetadataCheck()
+
+ # Set up the context object with PyPIRegistry instance.
+ ctx = MockAnalyzeContext(macaron_path=macaron_path, output_dir="", purl=purl)
+ pypi_registry = PyPIRegistry()
+ ctx.dynamic_data["package_registries"] = [PackageRegistryInfo(pip_tool, pypi_registry)]
+
+ # Set up responses of PyPI endpoints using the httpserver plugin.
+ with open(os.path.join(RESOURCE_PATH, "pypi_files", "zlibxjson.html"), encoding="utf8") as page:
+ p_page_content = page.read()
+
+ with open(os.path.join(RESOURCE_PATH, "pypi_files", "zlibxjson_user.html"), encoding="utf8") as page:
+ u_page_content = page.read()
+
+ with open(os.path.join(RESOURCE_PATH, "pypi_files", "zlibxjson_package.json"), encoding="utf8") as page:
+ package_json = json.load(page)
+
+ with open(os.path.join(RESOURCE_PATH, "pypi_files", "zlibxjson-8.2.source"), "rb") as source:
+ source_tarball = source.read()
+
+ base_url_parsed = urllib.parse.urlparse(httpserver.url_for(""))
+ user_config_input = f"""
+ [package_registry.pypi]
+ request_timeout = 20
+ registry_url_netloc = {base_url_parsed.netloc}
+ registry_url_scheme = {base_url_parsed.scheme}
+ fileserver_url_netloc = {base_url_parsed.netloc}
+ fileserver_url_scheme = {base_url_parsed.scheme}
+ """
+ user_config_path = os.path.join(tmp_path, "config.ini")
+ with open(user_config_path, "w", encoding="utf-8") as user_config_file:
+ user_config_file.write(user_config_input)
+ # We don't have to worry about modifying the ``defaults`` object causing test
+ # pollution here, since we reload the ``defaults`` object before every test with the
+ # ``setup_test`` fixture.
+ load_defaults(user_config_path)
+ pypi_registry.load_defaults()
+
+ httpserver.expect_request("/project/zlibxjson").respond_with_data(p_page_content)
+ httpserver.expect_request("/user/tser111111").respond_with_data(u_page_content)
+ httpserver.expect_request("/pypi/zlibxjson/json").respond_with_json(package_json)
+ httpserver.expect_request(
+ "/packages/3e/1e/b1ecb05e7ca1eb74ca6257a7f43d052b90d2ac01feb28eb28ce677a871ab/zlibxjson-8.2.tar.gz"
+ ).respond_with_data(source_tarball, content_type="application/octet-stream")
+
+ assert check.run_check(ctx).result_type == expected