Skip to content
This repository has been archived by the owner on Jan 27, 2023. It is now read-only.

[tech-debt] Removes unused to_grype() methods from PackageMapper #1254

Merged
merged 1 commit into from
Nov 15, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 1 addition & 155 deletions anchore_engine/services/policy_engine/engine/vulns/mappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
Vulnerability,
VulnerabilityMatch,
)
from anchore_engine.db import Image, ImageCpe, ImagePackage
from anchore_engine.db import Image
from anchore_engine.services.policy_engine.engine.vulns.utils import get_api_endpoint
from anchore_engine.subsys import logger as log
from anchore_engine.util.cpe_generators import (
Expand Down Expand Up @@ -51,26 +51,6 @@ def __init__(self, engine_type, grype_type, grype_language=None):
# default language to blank string
self.grype_language = grype_language if grype_language else ""

def to_grype(
self,
image_package: ImagePackage,
location_cpes_dict: Dict[str, List[str]] = None,
):
artifact = {
"id": str(uuid.uuid4()),
"name": image_package.name,
"version": image_package.fullversion,
"type": self.grype_type,
"locations": [
{
"path": image_package.pkg_path,
}
],
# Package type specific mappers add metadata attribute
}

return artifact

def image_content_to_grype_sbom(self, record: Dict):
"""
Creates a grype sbom formatted record from a single image content API response record
Expand Down Expand Up @@ -98,25 +78,6 @@ class KBMapper(PackageMapper):
def __init__(self):
super(KBMapper, self).__init__(engine_type="kb", grype_type="msrc-kb")

def to_grype(
self,
image_package: ImagePackage,
location_cpes_dict: Dict[str, List[str]] = None,
):
"""
Convert this image package to a sbom artifact suitable for Grype input.
The name is the Microsoft Product ID, which is the Windows OS version.
The version is the KB (Knowledge Base) ID, which is the identifier for the patch.
"""
artifact = {
"id": str(uuid.uuid4()),
"name": image_package.src_pkg,
"version": image_package.version,
"type": self.grype_type,
"locations": [{"path": image_package.pkg_path}],
}
return artifact

def image_content_to_grype_sbom(self, record: Dict):
artifact = {
"id": str(uuid.uuid4()),
Expand All @@ -142,39 +103,6 @@ class RpmMapper(LinuxDistroPackageMapper):
def __init__(self):
super(RpmMapper, self).__init__(engine_type="rpm", grype_type="rpm")

def to_grype(
self,
image_package: ImagePackage,
location_cpes_dict: Dict[str, List[str]] = None,
):
"""
Adds the source package information to grype sbom

Source package has already been through 2 transformations before this point
1. From syft sbom to analyzer manifest in anchore_engine/analyzers/syft/handlers/rpm.py
2. From analyzer manifest to policy-engine ImagePackage in anchore_engine/services/policy_engine/engine/loaders.py

After the above transformations ImagePackage.src_pkg is the equivalent of syft/grype sourceRpm

"""
artifact = super().to_grype(image_package, location_cpes_dict)
if image_package.normalized_src_pkg != "N/A":
artifact["metadataType"] = "RpmdbMetadata"
artifact["metadata"] = {"sourceRpm": image_package.src_pkg}

# This epoch handling is necessary because in RPMs the epoch of the binary package is often not part of the
# sourceRpm name, but Grype needs it to do the version comparison correctly.
# Without this step Grype will use the wrong version string for the vulnerability match
epoch_str, version, release = split_fullversion(image_package.fullversion)
if epoch_str:
try:
artifact["epoch"] = int(epoch_str)
except ValueError:
# not a valid epoch, something went wrong
pass

return artifact

def image_content_to_grype_sbom(self, record: Dict):
"""
Adds the source package information to grype sbom
Expand Down Expand Up @@ -221,48 +149,6 @@ class DpkgMapper(LinuxDistroPackageMapper):
def __init__(self):
super(DpkgMapper, self).__init__(engine_type="dpkg", grype_type="deb")

def to_grype(
self,
image_package: ImagePackage,
location_cpes_dict: Dict[str, List[str]] = None,
):
"""
Adds the source package information to grype sbom

Source package has already been through 2 transformations before this point. These transformations make the final result more error prone unlike rpm
1. From syft sbom to analyzer manifest in anchore_engine/analyzers/syft/handlers/debian.py. combines source package and source/package version into one value
2. From analyzer manifest to policy-engine ImagePackage in anchore_engine/services/policy_engine/engine/loaders.py. attempts to split the source package and version

After the above transformations ImagePackage.normalized_src_pkg is the closest approximation to syft/grype source.
Closest because the corner cases are not handled correctly by the above transformations

Example of not-working case
Syft output of bsdutils package
{
"name": "bsdutils",
"version": "1:2.20.1-5.3",
"type": "deb",
"foundBy": "dpkgdb-cataloger",
"metadataType": "DpkgMetadata",
"metadata": {
"package": "bsdutils",
"source": "util-linux",
"version": "1:2.20.1-5.3",
"sourceVersion": "2.20.1-5.3",
...

Notice version and sourceVersion are different because of the epoch
- Step 1 processes this into sourcepkg=util-linux-2.20.1-5.3
- Step 2 falters in it's processing because of the epoch difference and saves util-linux-2.20.1-5.3 to both
ImagePackage.src_pkg and ImagePackage.normalized_src_pkg. The correct value for normalized_src_pkg is util-linux

"""
artifact = super().to_grype(image_package, location_cpes_dict)
if image_package.normalized_src_pkg != "N/A":
artifact["metadataType"] = "DpkgMetadata"
artifact["metadata"] = {"source": image_package.normalized_src_pkg}
return artifact

def image_content_to_grype_sbom(self, record: Dict):
"""
Adds the source package information to grype sbom
Expand All @@ -284,21 +170,6 @@ class ApkgMapper(LinuxDistroPackageMapper):
def __init__(self):
super(ApkgMapper, self).__init__(engine_type="APKG", grype_type="apk")

def to_grype(
self,
image_package: ImagePackage,
location_cpes_dict: Dict[str, List[str]] = None,
):
artifact = super().to_grype(image_package, location_cpes_dict)

# pkgdb/ prefix is added to all os package locations, it's the only way to associate a package with it's cpes
cpes = location_cpes_dict.get(f"pkgdb/{image_package.name}")
if cpes:
# populate cpes for os packages
artifact["cpes"] = [cpe.get_cpe23_fs_for_sbom() for cpe in cpes]

return artifact

def image_content_to_grype_sbom(self, record: Dict):
"""
Adds the origin package information to grype sbom
Expand All @@ -316,31 +187,6 @@ def image_content_to_grype_sbom(self, record: Dict):


class CPEMapper(PackageMapper):
def to_grype(
self,
image_package: ImagePackage,
location_cpes_dict: Dict[str, List[ImageCpe]] = None,
):
cpes = location_cpes_dict.get(image_package.pkg_path)
if cpes:
artifact = {
"id": str(uuid.uuid4()),
"name": image_package.name,
"type": self.grype_type,
"language": self.grype_language,
"locations": [
{
"path": image_package.pkg_path,
}
],
"cpes": [cpe.get_cpe23_fs_for_sbom() for cpe in cpes],
"version": cpes[0].version, # set the version explicitly for grype
}

return artifact
else:
raise ValueError("No CPEs found for package={}".format(image_package.name))

def fallback_cpe_generator(self, record: Dict) -> List[str]:
return generate_fuzzy_cpes(
record.get("package"), record.get("version"), self.engine_type
Expand Down