diff --git a/src/macaron/__main__.py b/src/macaron/__main__.py index a3d1dcbc5..8e5da2abb 100644 --- a/src/macaron/__main__.py +++ b/src/macaron/__main__.py @@ -179,6 +179,7 @@ def analyze_slsa_levels_single(analyzer_single_args: argparse.Namespace) -> None analyzer_single_args.sbom_path, deps_depth, provenance_payload=prov_payload, + validate_malware_switch=analyzer_single_args.validate_malware_switch, ) sys.exit(status_code) @@ -483,6 +484,13 @@ def main(argv: list[str] | None = None) -> None: "The path to the local .m2 directory. If this option is not used, Macaron will use the default location at $HOME/.m2" ), ) + + single_analyze_parser.add_argument( + "--validate-malware-switch", + required=False, + action="store_true", + help=("Enable malware validation."), + ) # Dump the default values. sub_parser.add_parser(name="dump-defaults", description="Dumps the defaults.ini file to the output directory.") diff --git a/src/macaron/malware_analyzer/pypi_heuristics/pypi_sourcecode_analyzer.py b/src/macaron/malware_analyzer/pypi_heuristics/pypi_sourcecode_analyzer.py new file mode 100644 index 000000000..edf7a1830 --- /dev/null +++ b/src/macaron/malware_analyzer/pypi_heuristics/pypi_sourcecode_analyzer.py @@ -0,0 +1,491 @@ +# Copyright (c) 2024 - 2024, Oracle and/or its affiliates. All rights reserved. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. + +""" +Detect suspicious function calls in the code and trace the arguments back to their original values. + +This allows for deeper analysis of potentially malicious behavior. +""" + +import ast +import base64 +import binascii +import ipaddress +import logging +import os +import pathlib +import re + +import yaml + +from macaron.json_tools import JsonType +from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset + +logger: logging.Logger = logging.getLogger(__name__) + + +class DataFlowTracer(ast.NodeVisitor): + """The class is used to create the symbol table and analyze the dataflow.""" + + def __init__(self) -> None: + self.symbol_table: dict = {} # Store variable assignments + self.trace_path: list = [] + + def visit_Assign(self, node: ast.Assign) -> None: # noqa: N802 # pylint: disable=C0103 + """Visit the Assign node and build the symbol table.""" + for target in node.targets: + if isinstance(target, ast.Name): + target_name = target.id + if isinstance(node.value, ast.Name): + self.symbol_table[target_name] = str(node.value.id) + elif isinstance(node.value, ast.Constant): + self.symbol_table[target_name] = str(node.value.value) + # Handle other assignment types as needed (e.g., function calls, lists) + else: + self.symbol_table[target_name] = ast.unparse(node.value) + self.generic_visit(node) # Important for visiting nested assign + + def trace_back(self, variable_name: str) -> list: + """Get the full path of the dataflow. + + Parameters + ---------- + variable_name: str + The argument of the function call. + + Returns + ------- + list + The path of the dataflow. + """ + self.trace_path = [] + self._recursive_trace(variable_name) + return self.trace_path + + def _recursive_trace(self, variable_name: str) -> None: + """Recursively build the dataflow path by analyzing the symbol table. + + Parameters + ---------- + variable_name: str + The argument of the function call. + """ + if variable_name in self.symbol_table: + value = self.symbol_table[variable_name] + if not self.trace_path: + self.trace_path.extend([variable_name, value]) + else: + self.trace_path.append(value) + if ( + isinstance(value, str) and value in self.symbol_table and self.symbol_table[value] != value + ): # only trace if it is a var name + self._recursive_trace(value) + + def generate_symbol_table(self, source_code: str) -> None: + """Generate the symbol table. + + Parameters + ---------- + source_code: str + The source code of the script. + """ + tree = ast.parse(source_code) + self.visit(tree) + + +class PyPISourcecodeAnalyzer: + """This class is used to analyze the source code.""" + + def __init__(self, pypi_package_json: PyPIPackageJsonAsset) -> None: + """Collect required data for analysing the source code.""" + self.source_code: dict[str, str] | None = pypi_package_json.get_sourcecode() + self.suspicious_pattern: dict[str, JsonType] | None = self._load_suspicious_pattern() + # self.extracted_suspicious_content: dict[str, JsonType] = {} + self.analysis_result: dict = {} + self.is_malware: bool = False + + def analyze(self) -> tuple[bool, dict]: + """Analyze the source code of the PyPI package. + + Returns + ------- + dict + The result of the analysis. + """ + if self.source_code and self.suspicious_pattern: + for filename, content in self.source_code.items(): + try: + imports = self._extract_imports_from_ast(content) + except SyntaxError: + imports = self._extract_imports_from_lines(content) + + if isinstance(self.suspicious_pattern["imports"], list): + suspicious_imports: set[str] | None = imports & set(self.suspicious_pattern["imports"]) + else: + suspicious_imports = None + + # No suspicious imports in the source code. Skip the further steps. + if not suspicious_imports: + logger.debug("No suspicious imports found in the file %s", filename) + continue + + # TODO: Currently the symbol table stores the data for dataflow analysis. + # In the future, the dataflow will be more complicated and even handle the cross-file dataflow. + tracer = DataFlowTracer() + tracer.generate_symbol_table(content) + logger.debug(tracer.symbol_table) + + # TODO: In the future, the probability policy to decide the file is malicious or not + # will be implemented. Therefore, the functioncall_analyzer.analyze() will return detail_info + # and analysis result. + functioncall_analyzer = FunctionCallAnalyzer(self.suspicious_pattern, tracer) + is_malware, detail_info = functioncall_analyzer.analyze(content) + if is_malware: + self.is_malware = is_malware + + # TODO: Currently, the result collector does not handle the situation that + # multiple same filename. In the future, this will be replace with absolute path. + if detail_info: + self.analysis_result[filename] = detail_info + + # TODO: Implement other suspicious setup in suspicious_pattern.yaml + # pattern = r"install_requires\s*=\s*\[(.*?)\]" + # matches: re.Match | None = re.search(pattern, content, re.DOTALL) + # if matches: + # install_requires: set[str] | None = set(re.findall(r"'(.*?)'", matches.group(1))) + # if ( + # install_requires + # and install_requires & set(self.suspicious_pattern["imports"]) + # and len(install_requires) < 4 + # # This threshold is based on historical malwares + # ): + # extracted_data["install_requires"] = install_requires + # TODO: In the future this result from each file will be used to calculate the probability. + # Then the is_malicious will be based on this value. + # Currently, the default policy is + return self.is_malware, self.analysis_result + + # def extract_susupicious_content(self) -> None: + # """Extract the suspicious content from the source code.""" + # if not self.source_code or not self.suspicious_pattern: + # return + # self.extracted_suspicious_content = self._extract_suspicious_content_from_source() + + def _load_suspicious_pattern(self) -> dict[str, JsonType] | None: + """Load the suspicious pattern from suspicious_pattern.yaml. + + Returns + ------- + dict[str, JsonType] | None + The suspicious pattern. + """ + filename: str = "suspicious_pattern.yaml" + curr_dir: pathlib.Path = pathlib.Path(__file__).parent.absolute() + suspicious_pattern_file: str = os.path.join(curr_dir, filename) + with open(suspicious_pattern_file, encoding="utf-8") as file: + try: + suspicious_pattern: dict[str, JsonType] = yaml.safe_load(file) + except yaml.YAMLError as yaml_exception: + logger.debug("Error parsing the yaml file: '%s'", yaml_exception) + return None + return suspicious_pattern + + def _extract_imports_from_ast(self, content: str) -> set[str]: + """Extract imports from source code using the parsed AST. + + Parameters + ---------- + source_content: str + The source code as a string. + + Returns + ------- + set[str] + The set of imports. + + Raises + ------ + SyntaxError + If the code could not be parsed. + """ + imports = set() + tree = ast.parse(content) + for node in ast.walk(tree): + if isinstance(node, ast.Import): + for alias in node.names: + imports.add(alias.name) + elif isinstance(node, ast.ImportFrom): + module = node.module + if module: + _module = "." * node.level + module + imports.add(_module) + for name in node.names: + imports.add(_module + "." + name.name) + + return imports + + def _extract_imports_from_lines(self, content: str) -> set[str]: + """Extract imports from source code using per line pattern matching. + + Parameters + ---------- + source_content: str + The source code as a string. + + Returns + ------- + set[str] + The list of imports. + """ + alias_pattern = r"\s+as\s+\w+(?:\.{0,1}\w+)*" + # Pattern for module aliases. + + module_name = r"\w+(?:\.{0,1}\w+" + # as described under pattern_import. + + pattern_import = ( + r"(?:import\s+)(" + module_name + r")*(?:" + alias_pattern + r")?" + r"(?:(?:\s*,\s*)(?:" + module_name + r")*(?:" + alias_pattern + r")?))*)(?:(?:\s|#).*)?" + ) + # Allows for a standard import statement. + # E.g.: import + # Where consists of one or more . + # Where consists of one or more words (a-z or 0-9 or underscore) separated by periods, + # with an optional alias. + # Where allows any character(s) either after a single space or a hash (#). + + pattern_from_import = ( + r"(?:from\s+)([.]*" + + module_name + + r")*)(?:\s+import\s+(\w+(?:\s+as\s+\w+)?(?:(?:\s*,\s*)(?:\w+(?:\s+as\s+\w+)?))*))" + ) + # Allows for a from import statement. + # E.g.: from import + # Where is as above, but can also be preceded by any number of periods. + # (Note only a single module can be placed here.) + # Where consists of one or more with optional aliases. + # Where is identical to except without any periods. + # Where requires at least one space followed by one or more word characters, plus + # any other characters following on from that. + + combined_pattern = f"^(?:{pattern_import})|(?:{pattern_from_import})$" + # The combined pattern creates two match groups: + # 1 - standard import statement. + # 2 - from import statement module. + # 3 - from import statement module components. + + imports = set() + for line in content.splitlines(): + line.strip() + match = re.match(combined_pattern, line) + if not match: + continue + + if match.group(1): + # Standard import, handle commas and aliases if present. + splits = self._prune_aliased_lines(match.group(1), alias_pattern) + for split in splits: + imports.add(split) + elif match.group(2): + # From import + imports.add(match.group(2)) + if match.group(3): + splits = self._prune_aliased_lines(match.group(3), alias_pattern) + for split in splits: + imports.add(match.group(2) + "." + split) + + return imports + + def _prune_aliased_lines(self, text: str, alias_pattern: str) -> list[str]: + """Split the line on commas and remove any aliases from individual parts.""" + results = [] + splits = text.split(",") + for split in splits: + split = split.strip() + results.append(re.sub(alias_pattern, "", split)) + return results + + +class FunctionCallAnalyzer(ast.NodeVisitor): + """This class analyzes Python source code to identify potential suspicious behavior.""" + + def __init__(self, suspicious_pattern: dict, tracer: DataFlowTracer) -> None: + """Initialize the analyzer. + + Parameters + ---------- + suspicious_pattern: dict + The suspicious behaviour mainly includes the function call and constant. + """ + self.suspicious_patterns: dict = suspicious_pattern + self.analysis_detail: dict = { + "OS Detection": {}, + "Code Execution": {}, + "Information Collecting": {}, + "Remote Connection": {}, + "Custom Setup": {}, + "Obfuscation": {}, + } + self.tracer = tracer + self.is_malware = False + + def visit_Module(self, node: ast.Module) -> None: # noqa: N802 # pylint: disable=C0103 + """Visit all root node.""" + self.generic_visit(node) + + # TODO: Detect OS might generate false alert. + # def visit_If(self, node: ast.If) -> None: + # """Visit the If node.""" + # if isinstance(node.test, ast.Compare): + # unparsed_expr: str = ast.unparse(node) + # # Some malware excute different malicious code based on the victims OS. + # for os_detection_constant in self.suspicious_patterns["ast_constant"]["os_detection"]: + # if os_detection_constant in unparsed_expr: + # TODO: This function is required to be implemented with dataflow analysis + # self.analysis_detail["OS Detection"][node.lineno] = unparsed_expr + # self.is_malware = True + # self.generic_visit(node) + + def visit_Call(self, node: ast.Call) -> None: # noqa: N802 # pylint: disable=C0103 + """Visit the Call node.""" + suspicious_calls: dict = self.suspicious_patterns["ast_calls"] + suspicious_const: dict = self.suspicious_patterns["ast_constant"] + function_call: str = ast.unparse(node.func) + args: str = " ".join([ast.unparse(arg) for arg in node.args]) + expr: str = ast.unparse(node) + trace_path: list = self.tracer.trace_back(args) + path: str = "" + if trace_path: + path = " ->".join(trace_path) + for call_type in suspicious_calls: + if self._is_malware(suspicious_calls[call_type], function_call): + for constant_type in suspicious_const: # Further confirmed by checking the arguments + if ( + self._is_malware(suspicious_const[constant_type], args) + or IP().extract_public_ipv4(args) + or self._is_malware(suspicious_const[constant_type], Decryptor().base64_decode(args)) + ): + self._summarize_analysis_detail(call_type, node.lineno, expr) + self.is_malware = True + elif self._is_malware(suspicious_const[constant_type], path): + self._summarize_analysis_detail(call_type, node.lineno, expr, path) + self.is_malware = True + self.generic_visit(node) + + def visit_ClassDef(self, node: ast.ClassDef) -> None: # noqa: N802 # pylint: disable=C0103 + """Visit the ClassDef node. This function is used to detect malicious behavior in setup.py.""" + if not node.bases: + self.generic_visit(node) + return + + for base in node.bases: + if isinstance(base, ast.Name): + if base.id == "install": + # TODO: Not pretty sure including this in setup.py means it is a malware, so the self.is_malware is not updated. + self.analysis_detail["Custom Setup"][node.lineno] = node.name + self.generic_visit(node) + + def _summarize_analysis_detail( + self, function_call_type: str, lineno: int, expr: str, trace_path: str | None = None + ) -> None: + """Store the analysis result in based on different type of malicious behaviour. + + Parameters + ---------- + function_call_type: str + The suspcious function call type. + lineno: int + The location of the source code block. + expr: str + The source code block. + trace_path: str + The dataflow path. + """ + detail = [expr] + + if trace_path: + detail.append(trace_path) + + match function_call_type: + case "code_execution": + self.analysis_detail["Code Execution"][lineno] = detail + case "info_collecting": + self.analysis_detail["Information Collecting"][lineno] = detail + case "remote_connection": + self.analysis_detail["Remote Connection"][lineno] = detail + case "obfuscation": + self.analysis_detail["Obfuscation"][lineno] = detail + + def _is_malware(self, malicious_pattern: list, target: str | None) -> bool: + """Check the source code matched the suspicious pattern. + + Parameters + ---------- + malicious_pattern: list + A collection of the suspicious source code. + target: str + The componenet of the source code block. + + Returns + ------- + bool + The result. + """ + if not target: + return False + for _ in malicious_pattern: # pylint: disable=C0103, C0501 + if _ in target: + return True + return False + + def analyze(self, source_code: str) -> tuple[bool, dict]: + """Analyze the source code.""" + tree = ast.parse(source_code) + self.visit(tree) + return self.is_malware, self.analysis_detail + + +class Decryptor: + """This class includes multiple built-in decryption methods.""" + + # Only decrypt the string with the built-in decrypt method; otherwise, provide the source code + # for the user. And notify them to decrypt using the corresponding decrypt method + # TODO: Implement more decryption method. + + def __init__(self) -> None: + pass + + def base64_decode(self, encoded_value: str | bytes) -> str | None: + """Decode the encoded value.""" + try: + decoded_bytes = base64.b64decode(encoded_value) + return decoded_bytes.decode("utf-8") + except (binascii.Error, UnicodeDecodeError): + return None + + +class IP: + """This class provides the method to identify the IP in the source code.""" + + def __init__(self) -> None: + pass + + def is_valid_public_ipv4(self, ip: str) -> bool: + """Check whether it is a public IPv4.""" + try: + ip_obj = ipaddress.ip_address(ip) + return ip_obj.version == 4 and not ip_obj.is_private and not ip_obj.is_loopback + except ValueError: + # If ip_address() raises an error, it's not a valid IP + return False + + def extract_public_ipv4(self, text: str) -> list: + """Extract the public IPv4 from the source code.""" + ipv4_pattern = r"\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b" + all_ips = re.findall(ipv4_pattern, text) + # Filter valid public IPv4 addresses + valid_public_ipv4s = [] + for ip in all_ips: + if self.is_valid_public_ipv4(ip): + valid_public_ipv4s.append(ip) + return valid_public_ipv4s diff --git a/src/macaron/malware_analyzer/pypi_heuristics/suspicious_pattern.yaml b/src/macaron/malware_analyzer/pypi_heuristics/suspicious_pattern.yaml new file mode 100644 index 000000000..9c15144d4 --- /dev/null +++ b/src/macaron/malware_analyzer/pypi_heuristics/suspicious_pattern.yaml @@ -0,0 +1,101 @@ +# Copyright (c) 2024 - 2024, Oracle and/or its affiliates. All rights reserved. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. + + +#This file defines the malicious pattern. +#The pattern is collected from the malware repository of Pypi.org. +imports: +- requests +- base64 +- Fernet +- telebot +- platform +- ClientSession +- socket +- os +- getpass +- telegram +- __pyarmor__ +- urllib.request.urlopen +- subprocess +- Request + +ast_calls: + os_detection: + - os.name + code_execution: + - exec + - subprocess.run + - subprocess.call + - subprocess.Popen + - subprocess.check_call + - os.system + info_collecting: + - os.getcwd + - os.getlogin + - os.getenv + - os.environ + - os.uname + - getpass.getuser + - socket.gethostname + - platform.node + - platform.system + - platform.version + - keyboard.on_release + obfuscation: + - base64.b64decode + - __pyarmor__ + # - Fernet.decrypt + remote_connection: + - requests.get + - requests.post + - telegram.send_document + - urllib.request.urlopen + - urllib.request.urlretrieve + - Request + - socket.socket + custom_setup: + - install + reverse_shell: + - os.dup2 + +ast_constant: + domains: + - webhook.site + - discord + - cdn.discordapp.com + - oast.fun + - api.telegram.org + - diddlydingusdu.de # builderknower2 + - pipedream.net # business-kpi-manager + - 2.tcp.ngrok.io + - files.pypihosted.org + - filebin.net + - akinasouls.fr + - api.ipify.org # Get public IP of the victim + - httpbin.or + - ngrok.ap + - oastify.com + - pythonanywhere.com + - deliverycontent.online + local_path: + - /storage/emulated/0 # Android: primary user account on the device + - /etc/resolv.conf # DNS + - /etc/hosts # DNS + - /sys/class/net # Network related + - /run/systemd/resolve/stub-resolv.conf + - /sdcard/DCIM # Photo storage + executable: + - .exe + windows: + - APPDATA + - Start-Process # Execute command + - powershell + reverse_shell: + - /dev/tcp + os_detection: + - nt # Windows + - Windows + - Darwin # MacOS + - Linux + - posix # Linux diff --git a/src/macaron/slsa_analyzer/analyze_context.py b/src/macaron/slsa_analyzer/analyze_context.py index 31da3d54c..c2f6a0042 100644 --- a/src/macaron/slsa_analyzer/analyze_context.py +++ b/src/macaron/slsa_analyzer/analyze_context.py @@ -57,6 +57,8 @@ class ChecksOutputs(TypedDict): """True if the provenance exists and has been verified against a signed companion provenance.""" local_artifact_paths: list[str] """The local artifact absolute paths.""" + validate_malware_switch: bool + """True when the malware validation is enabled.""" class AnalyzeContext: @@ -113,6 +115,7 @@ def __init__( provenance_commit_digest=None, provenance_verified=False, local_artifact_paths=[], + validate_malware_switch=False, ) @property diff --git a/src/macaron/slsa_analyzer/analyzer.py b/src/macaron/slsa_analyzer/analyzer.py index e95c29a5a..894c82134 100644 --- a/src/macaron/slsa_analyzer/analyzer.py +++ b/src/macaron/slsa_analyzer/analyzer.py @@ -125,6 +125,7 @@ def run( sbom_path: str = "", deps_depth: int = 0, provenance_payload: InTotoPayload | None = None, + validate_malware_switch: bool = False, ) -> int: """Run the analysis and write results to the output path. @@ -173,6 +174,7 @@ def run( main_config, analysis, provenance_payload=provenance_payload, + validate_malware_switch=validate_malware_switch, ) if main_record.status != SCMStatus.AVAILABLE or not main_record.context: @@ -290,6 +292,7 @@ def run_single( analysis: Analysis, existing_records: dict[str, Record] | None = None, provenance_payload: InTotoPayload | None = None, + validate_malware_switch: bool = False, ) -> Record: """Run the checks for a single repository target. @@ -480,6 +483,7 @@ def run_single( analyze_ctx.dynamic_data["provenance_verified"] = provenance_is_verified analyze_ctx.dynamic_data["provenance_repo_url"] = provenance_repo_url analyze_ctx.dynamic_data["provenance_commit_digest"] = provenance_commit_digest + analyze_ctx.dynamic_data["validate_malware_switch"] = validate_malware_switch if parsed_purl and parsed_purl.type in self.local_artifact_repo_mapper: local_artifact_repo_path = self.local_artifact_repo_mapper[parsed_purl.type] diff --git a/src/macaron/slsa_analyzer/checks/detect_malicious_metadata_check.py b/src/macaron/slsa_analyzer/checks/detect_malicious_metadata_check.py index 15daf8d65..20c3e7a4f 100644 --- a/src/macaron/slsa_analyzer/checks/detect_malicious_metadata_check.py +++ b/src/macaron/slsa_analyzer/checks/detect_malicious_metadata_check.py @@ -22,6 +22,7 @@ from macaron.malware_analyzer.pypi_heuristics.metadata.unchanged_release import UnchangedReleaseAnalyzer from macaron.malware_analyzer.pypi_heuristics.metadata.unreachable_project_links import UnreachableProjectLinksAnalyzer from macaron.malware_analyzer.pypi_heuristics.metadata.wheel_absence import WheelAbsenceAnalyzer +from macaron.malware_analyzer.pypi_heuristics.pypi_sourcecode_analyzer import PyPISourcecodeAnalyzer from macaron.malware_analyzer.pypi_heuristics.sourcecode.suspicious_setup import SuspiciousSetupAnalyzer from macaron.slsa_analyzer.analyze_context import AnalyzeContext from macaron.slsa_analyzer.build_tool.pip import Pip @@ -75,6 +76,7 @@ class MaliciousMetadataFacts(CheckFacts): WheelAbsenceAnalyzer, ] + # The HeuristicResult sequence is aligned with the sequence of ANALYZERS list SUSPICIOUS_COMBO: dict[ tuple[ @@ -206,6 +208,27 @@ def _should_skip( return True return False + def validate_malware(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[bool, dict[str, JsonType] | None]: + """Validate the package is malicious. + + Parameters + ---------- + pypi_package_json: PyPIPackageJsonAsset + + Returns + ------- + tuple[bool, dict[str, JsonType] | None] + Returns True if the source code includes suspicious pattern. + Returns the result of the validation including the line number + and the suspicious arguments. + e.g. requests.get("http://malicious.com") + return the "http://malicious.com" + """ + # TODO: This redundant function might be removed + sourcecode_analyzer = PyPISourcecodeAnalyzer(pypi_package_json) + is_malware, detail_info = sourcecode_analyzer.analyze() + return is_malware, detail_info + def run_heuristics( self, pypi_package_json: PyPIPackageJsonAsset ) -> tuple[dict[Heuristics, HeuristicResult], dict[str, JsonType]]: @@ -228,9 +251,11 @@ def run_heuristics( """ results: dict[Heuristics, HeuristicResult] = {} detail_info: dict[str, JsonType] = {} + for _analyzer in ANALYZERS: analyzer: BaseHeuristicAnalyzer = _analyzer() logger.debug("Instantiating %s", _analyzer.__name__) + depends_on: list[tuple[Heuristics, HeuristicResult]] | None = analyzer.depends_on if depends_on: @@ -243,6 +268,7 @@ def run_heuristics( if analyzer.heuristic: results[analyzer.heuristic] = result detail_info.update(result_info) + return results, detail_info def run_check(self, ctx: AnalyzeContext) -> CheckResultData: @@ -316,6 +342,13 @@ def run_check(self, ctx: AnalyzeContext) -> CheckResultData: if confidence is None: confidence = Confidence.HIGH result_type = CheckResultType.PASSED + elif ctx.dynamic_data["validate_malware_switch"]: + is_malware, validation_result = self.validate_malware(pypi_package_json) + if is_malware: # Find source code block matched the malicious pattern + confidence = Confidence.HIGH + elif validation_result: # Find suspicious source code, but cannot be confirmed + confidence = Confidence.MEDIUM + logger.debug(validation_result) result_tables.append( MaliciousMetadataFacts( diff --git a/src/macaron/slsa_analyzer/package_registry/pypi_registry.py b/src/macaron/slsa_analyzer/package_registry/pypi_registry.py index dd52e6394..496366322 100644 --- a/src/macaron/slsa_analyzer/package_registry/pypi_registry.py +++ b/src/macaron/slsa_analyzer/package_registry/pypi_registry.py @@ -5,12 +5,16 @@ import logging import os +import tarfile +import tempfile import urllib.parse +import zipfile from dataclasses import dataclass from datetime import datetime import requests from bs4 import BeautifulSoup, Tag +from requests import RequestException from macaron.config.defaults import defaults from macaron.database.table_definitions import Component @@ -165,6 +169,78 @@ def download_package_json(self, url: str) -> dict: return res_obj + def fetch_sourcecode(self, src_url: str) -> dict[str, str] | None: + """Get the source code of the package. + + Returns + ------- + str | None + The source code. + """ + # Get name of file. + _, _, file_name = src_url.rpartition("/") + + # Create a temporary directory to store the downloaded source. + with tempfile.TemporaryDirectory() as temp_dir: + try: + response = requests.get(src_url, stream=True, timeout=40) + response.raise_for_status() + except requests.exceptions.HTTPError as http_err: + logger.debug("HTTP error occurred: %s", http_err) + return None + + if response.status_code != 200: + return None + + source_file = os.path.join(temp_dir, file_name) + with open(source_file, "wb") as file: + try: + for chunk in response.iter_content(): + file.write(chunk) + except RequestException as error: + # Something went wrong with the request, abort. + logger.debug("Error while streaming source file: %s", error) + response.close() + return None + logger.debug("Begin fetching the source code from PyPI") + py_files_content: dict[str, str] = {} + if tarfile.is_tarfile(source_file): + try: + with tarfile.open(source_file, "r:gz") as tar: + for member in tar.getmembers(): + if member.isfile() and member.name.endswith(".py") and member.size > 0: + file_obj = tar.extractfile(member) + if file_obj: + content = file_obj.read().decode("utf-8") + py_files_content[member.name] = content + except tarfile.ReadError as exception: + logger.debug("Error reading tar file: %s", exception) + return None + elif zipfile.is_zipfile(source_file): + try: + with zipfile.ZipFile(source_file, "r") as zip_ref: + for info in zip_ref.infolist(): + if info.filename.endswith(".py") and not info.is_dir() and info.file_size > 0: + with zip_ref.open(info) as file_obj: + content = file_obj.read().decode("utf-8") + py_files_content[info.filename] = content + except zipfile.BadZipFile as bad_zip_exception: + logger.debug("Error reading zip file: %s", bad_zip_exception) + return None + except zipfile.LargeZipFile as large_zip_exception: + logger.debug("Zip file too large to read: %s", large_zip_exception) + return None + # except KeyError as zip_key_exception: + # logger.debug( + # "Error finding target '%s' in zip file '%s': %s", archive_target, source_file, zip_key_exception + # ) + # return None + else: + logger.debug("Unable to extract file: %s", file_name) + + logger.debug("Successfully fetch the source code from PyPI") + return py_files_content + def get_package_page(self, package_name: str) -> str | None: """Implement custom API to get package main page. @@ -411,3 +487,17 @@ def get_latest_release_upload_time(self) -> str | None: upload_time: str | None = urls[0].get("upload_time") return upload_time return None + + def get_sourcecode(self) -> dict[str, str] | None: + """Get source code of the package. + + Returns + ------- + dict[str, str] | None + The source code of each script in the package + """ + url: str | None = self.get_sourcecode_url() + if url: + source_code: dict[str, str] | None = self.pypi_registry.fetch_sourcecode(url) + return source_code + return None