Skip to content

Commit

Permalink
Merge pull request anchore#1254 from anchore/remove_unused_togrype
Browse files Browse the repository at this point in the history
[tech-debt] Removes unused to_grype() methods from PackageMapper
  • Loading branch information
zhill authored Nov 15, 2021
2 parents 94a9d0b + 71c7544 commit 5f27192
Showing 1 changed file with 1 addition and 155 deletions.
156 changes: 1 addition & 155 deletions anchore_engine/services/policy_engine/engine/vulns/mappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
Vulnerability,
VulnerabilityMatch,
)
from anchore_engine.db import Image, ImageCpe, ImagePackage
from anchore_engine.db import Image
from anchore_engine.services.policy_engine.engine.vulns.utils import get_api_endpoint
from anchore_engine.subsys import logger as log
from anchore_engine.util.cpe_generators import (
Expand Down Expand Up @@ -51,26 +51,6 @@ def __init__(self, engine_type, grype_type, grype_language=None):
# default language to blank string
self.grype_language = grype_language if grype_language else ""

def to_grype(
self,
image_package: ImagePackage,
location_cpes_dict: Dict[str, List[str]] = None,
):
artifact = {
"id": str(uuid.uuid4()),
"name": image_package.name,
"version": image_package.fullversion,
"type": self.grype_type,
"locations": [
{
"path": image_package.pkg_path,
}
],
# Package type specific mappers add metadata attribute
}

return artifact

def image_content_to_grype_sbom(self, record: Dict):
"""
Creates a grype sbom formatted record from a single image content API response record
Expand Down Expand Up @@ -98,25 +78,6 @@ class KBMapper(PackageMapper):
def __init__(self):
super(KBMapper, self).__init__(engine_type="kb", grype_type="msrc-kb")

def to_grype(
self,
image_package: ImagePackage,
location_cpes_dict: Dict[str, List[str]] = None,
):
"""
Convert this image package to a sbom artifact suitable for Grype input.
The name is the Microsoft Product ID, which is the Windows OS version.
The version is the KB (Knowledge Base) ID, which is the identifier for the patch.
"""
artifact = {
"id": str(uuid.uuid4()),
"name": image_package.src_pkg,
"version": image_package.version,
"type": self.grype_type,
"locations": [{"path": image_package.pkg_path}],
}
return artifact

def image_content_to_grype_sbom(self, record: Dict):
artifact = {
"id": str(uuid.uuid4()),
Expand All @@ -142,39 +103,6 @@ class RpmMapper(LinuxDistroPackageMapper):
def __init__(self):
super(RpmMapper, self).__init__(engine_type="rpm", grype_type="rpm")

def to_grype(
self,
image_package: ImagePackage,
location_cpes_dict: Dict[str, List[str]] = None,
):
"""
Adds the source package information to grype sbom
Source package has already been through 2 transformations before this point
1. From syft sbom to analyzer manifest in anchore_engine/analyzers/syft/handlers/rpm.py
2. From analyzer manifest to policy-engine ImagePackage in anchore_engine/services/policy_engine/engine/loaders.py
After the above transformations ImagePackage.src_pkg is the equivalent of syft/grype sourceRpm
"""
artifact = super().to_grype(image_package, location_cpes_dict)
if image_package.normalized_src_pkg != "N/A":
artifact["metadataType"] = "RpmdbMetadata"
artifact["metadata"] = {"sourceRpm": image_package.src_pkg}

# This epoch handling is necessary because in RPMs the epoch of the binary package is often not part of the
# sourceRpm name, but Grype needs it to do the version comparison correctly.
# Without this step Grype will use the wrong version string for the vulnerability match
epoch_str, version, release = split_fullversion(image_package.fullversion)
if epoch_str:
try:
artifact["epoch"] = int(epoch_str)
except ValueError:
# not a valid epoch, something went wrong
pass

return artifact

def image_content_to_grype_sbom(self, record: Dict):
"""
Adds the source package information to grype sbom
Expand Down Expand Up @@ -221,48 +149,6 @@ class DpkgMapper(LinuxDistroPackageMapper):
def __init__(self):
super(DpkgMapper, self).__init__(engine_type="dpkg", grype_type="deb")

def to_grype(
self,
image_package: ImagePackage,
location_cpes_dict: Dict[str, List[str]] = None,
):
"""
Adds the source package information to grype sbom
Source package has already been through 2 transformations before this point. These transformations make the final result more error prone unlike rpm
1. From syft sbom to analyzer manifest in anchore_engine/analyzers/syft/handlers/debian.py. combines source package and source/package version into one value
2. From analyzer manifest to policy-engine ImagePackage in anchore_engine/services/policy_engine/engine/loaders.py. attempts to split the source package and version
After the above transformations ImagePackage.normalized_src_pkg is the closest approximation to syft/grype source.
Closest because the corner cases are not handled correctly by the above transformations
Example of not-working case
Syft output of bsdutils package
{
"name": "bsdutils",
"version": "1:2.20.1-5.3",
"type": "deb",
"foundBy": "dpkgdb-cataloger",
"metadataType": "DpkgMetadata",
"metadata": {
"package": "bsdutils",
"source": "util-linux",
"version": "1:2.20.1-5.3",
"sourceVersion": "2.20.1-5.3",
...
Notice version and sourceVersion are different because of the epoch
- Step 1 processes this into sourcepkg=util-linux-2.20.1-5.3
- Step 2 falters in it's processing because of the epoch difference and saves util-linux-2.20.1-5.3 to both
ImagePackage.src_pkg and ImagePackage.normalized_src_pkg. The correct value for normalized_src_pkg is util-linux
"""
artifact = super().to_grype(image_package, location_cpes_dict)
if image_package.normalized_src_pkg != "N/A":
artifact["metadataType"] = "DpkgMetadata"
artifact["metadata"] = {"source": image_package.normalized_src_pkg}
return artifact

def image_content_to_grype_sbom(self, record: Dict):
"""
Adds the source package information to grype sbom
Expand All @@ -284,21 +170,6 @@ class ApkgMapper(LinuxDistroPackageMapper):
def __init__(self):
super(ApkgMapper, self).__init__(engine_type="APKG", grype_type="apk")

def to_grype(
self,
image_package: ImagePackage,
location_cpes_dict: Dict[str, List[str]] = None,
):
artifact = super().to_grype(image_package, location_cpes_dict)

# pkgdb/ prefix is added to all os package locations, it's the only way to associate a package with it's cpes
cpes = location_cpes_dict.get(f"pkgdb/{image_package.name}")
if cpes:
# populate cpes for os packages
artifact["cpes"] = [cpe.get_cpe23_fs_for_sbom() for cpe in cpes]

return artifact

def image_content_to_grype_sbom(self, record: Dict):
"""
Adds the origin package information to grype sbom
Expand All @@ -316,31 +187,6 @@ def image_content_to_grype_sbom(self, record: Dict):


class CPEMapper(PackageMapper):
def to_grype(
self,
image_package: ImagePackage,
location_cpes_dict: Dict[str, List[ImageCpe]] = None,
):
cpes = location_cpes_dict.get(image_package.pkg_path)
if cpes:
artifact = {
"id": str(uuid.uuid4()),
"name": image_package.name,
"type": self.grype_type,
"language": self.grype_language,
"locations": [
{
"path": image_package.pkg_path,
}
],
"cpes": [cpe.get_cpe23_fs_for_sbom() for cpe in cpes],
"version": cpes[0].version, # set the version explicitly for grype
}

return artifact
else:
raise ValueError("No CPEs found for package={}".format(image_package.name))

def fallback_cpe_generator(self, record: Dict) -> List[str]:
return generate_fuzzy_cpes(
record.get("package"), record.get("version"), self.engine_type
Expand Down

0 comments on commit 5f27192

Please sign in to comment.