diff --git a/ci/test_files_touched.py b/ci/test_files_touched.py index fb823092..07d6f88b 100644 --- a/ci/test_files_touched.py +++ b/ci/test_files_touched.py @@ -48,8 +48,12 @@ # tern/classes re.compile('tern/classes/command.py'): ['python tests/test_class_command.py'], + re.compile('tern/classes/oci_image.py'): + ['tern report -i photon:3.0', + 'python tests/test_class_oci_image.py'], re.compile('tern/classes/docker_image.py'): - ['tern report -i photon:3.0'], + ['tern report -d samples/alpine_python/Dockerfile', + 'python tests/test_class_docker_image.py'], re.compile('tern/classes/file_data.py'): ['python tests/test_class_file_data.py'], re.compile('tern/classes/image.py'): @@ -121,6 +125,8 @@ ['python tests/test_analyze_default_dockerfile_parse.py'], re.compile('tests/test_class_command.py'): ['python tests/test_class_command.py'], + re.compile('tests/test_class_oci_image.py'): + ['python tests/test_class_oci_image.py'], re.compile('tests/test_class_docker_image.py'): ['python tests/test_class_docker_image.py', 'tern report -w photon.tar'], diff --git a/tern/classes/docker_image.py b/tern/classes/docker_image.py index 724dbd51..8734d5a2 100644 --- a/tern/classes/docker_image.py +++ b/tern/classes/docker_image.py @@ -157,6 +157,7 @@ def load_image(self, load_until_layer=0): if (self.load_until_layer >= layer_count or self.load_until_layer == 0): layer.set_checksum(checksum_type, layer.diff_id) + layer.image_layout = "docker" layer.gen_fs_hash() layer.layer_index = layer_count self._layers.append(layer) diff --git a/tern/classes/oci_image.py b/tern/classes/oci_image.py new file mode 100644 index 00000000..0768b362 --- /dev/null +++ b/tern/classes/oci_image.py @@ -0,0 +1,136 @@ +# -*- coding: utf-8 -*- +# +# Copyright (c) 2021 VMware, Inc. All Rights Reserved. +# SPDX-License-Identifier: BSD-2-Clause + +import json +import subprocess # nosec + +from tern.utils import rootfs +from tern.utils import general +from tern.classes.image import Image +from tern.utils.constants import manifest_file +from tern.classes.image_layer import ImageLayer + + +class OCIImage(Image): + """A representation of an OCI compatible image that exists on disk""" + def __init__(self, repotag=None): + super().__init__(repotag) + # In case the OCI image corresponds with an image built by Docker + # we also include the history + self.__history = None + if self.repotag is None: + raise NameError("Image object initialized with no repotag") + + # parse the repotag + repo_dict = general.parse_image_string(self._repotag) + self._name = repo_dict.get('name') + self._tag = repo_dict.get('tag') + self.set_checksum( + repo_dict.get('digest_type'), repo_dict.get('digest')) + + @property + def history(self): + return self.__history + + def to_dict(self, template=None): + # this should take care of 'origins' and 'layers' + oci_dict = super().to_dict(template) + return oci_dict + + def get_image_manifest(self): + temp_path = rootfs.get_working_dir() + with general.pushd(temp_path): + with open(manifest_file, encoding='utf-8') as f: + json_obj = json.loads(f.read()) + return json_obj + + def get_image_layers(self, manifest): + layers = [] + for layer in manifest.get('layers'): + layers.append(layer.get("digest").split(":")[1]) + return layers + + def get_image_config_file(self, manifest): + return manifest.get('config').get("digest").split(":")[1] + + def get_image_config(self, manifest): + config_file = self.get_image_config_file(manifest) + temp_path = rootfs.get_working_dir() + with general.pushd(temp_path): + with open(config_file, encoding='utf-8') as f: + json_obj = json.loads(f.read()) + return json_obj + + def get_image_history(self, config): + if 'history' in config.keys(): + return config['history'] + return None + + def get_diff_ids(self, config): + diff_ids = [] + for item in config['rootfs']['diff_ids']: + diff_ids.append(item.split(':').pop()) + return diff_ids + + def get_diff_checksum_type(self, config): + '''Get the checksum type that was used to calculate the diff_id + of the image''' + return config['rootfs']['diff_ids'][0].split(':')[0] + + def set_layer_created_by(self): + # the history is ordered according to the order of the layers + # so the first non-empty history corresponds with the first layer + index = 0 + for item in self.__history: + if 'empty_layer' not in item.keys(): + if 'created_by' in item.keys(): + self._layers[index].created_by = item['created_by'] + else: + self._layers[index].created_by = '' + index = index + 1 + + def load_image(self, load_until_layer=0): + if load_until_layer > 0: + self._load_until_layer = load_until_layer + try: + self._manifest = self.get_image_manifest() + self._config = self.get_image_config(self._manifest) + self.__history = self.get_image_history(self._config) + layer_paths = self.get_image_layers(self._manifest) + layer_diffs = self.get_diff_ids(self._config) + # if the digest isn't in the repotag, get it from the config + if not self.checksum: + repo_dict = general.parse_image_string( + self._config.get("config").get("Image")) + self.set_checksum(repo_dict.get("digest_type"), + repo_dict.get("digest")) + layer_checksum_type = self.get_diff_checksum_type(self._config) + layer_count = 1 + while layer_diffs and layer_paths: + layer = ImageLayer(layer_diffs.pop(0), layer_paths.pop(0)) + if (self.load_until_layer >= layer_count + or self.load_until_layer == 0): + layer.set_checksum(layer_checksum_type, layer.diff_id) + layer.image_layout = "oci" + # take care to set the layer index as it will be used + # to create the directory where the layer contents will + # be untarred + layer.layer_index = layer_count + layer.gen_fs_hash() + self._layers.append(layer) + layer_count = layer_count + 1 + self._total_layers = layer_count - 1 + if self.load_until_layer > self.total_layers: + # if user asked to analyze more layers than image has + # turn off the load_until_layer feature + self._load_until_layer = 0 + self.set_layer_created_by() + except NameError as e: + raise NameError(e) from e + except subprocess.CalledProcessError as e: + raise subprocess.CalledProcessError( + e.returncode, cmd=e.cmd, output=e.output, stderr=e.stderr) + except IOError as e: + raise IOError(e) from e diff --git a/tern/load/skopeo.py b/tern/load/skopeo.py new file mode 100644 index 00000000..89651998 --- /dev/null +++ b/tern/load/skopeo.py @@ -0,0 +1,42 @@ +# -*- coding: utf-8 -*- +# +# Copyright (c) 2021 VMware, Inc. All Rights Reserved. +# SPDX-License-Identifier: BSD-2-Clause + +""" +Interactions with remote container images using skopeo +""" + +import logging +import sys +import shutil + +from tern.utils import constants +from tern.utils import rootfs + +# global logger +logger = logging.getLogger(constants.logger_name) + + +def check_skopeo_setup(): + """Check if the skopeo tool is installed""" + if not shutil.which('skopeo'): + logger.critical('Skopeo is not installed') + logger.critical('Exiting...') + sys.exit(1) + + +def pull_image(image_tag_string): + """Use skopeo to pull a remote image into the working directory""" + # Check if skopeo is set up + check_skopeo_setup() + # we will assume the docker transport for now + remote = f'docker://{image_tag_string}' + local = f'dir:{rootfs.get_working_dir()}' + logger.debug("Attempting to pull image \"%s\"", image_tag_string) + result, error = rootfs.shell_command( + False, ['skopeo', 'copy', remote, local]) + if error: + logger.error("Error when downloading image: \"%s\"", error) + return None + return result diff --git a/tests/test_class_docker_image.py b/tests/test_class_docker_image.py index 62da7588..7f59ed5a 100644 --- a/tests/test_class_docker_image.py +++ b/tests/test_class_docker_image.py @@ -94,6 +94,7 @@ def testGetLayerDiffIds(self): def testLayerFiles(self): self.image.load_image() self.assertFalse(self.image.layers[0].files) + print(self.image.layers[0].image_layout) self.image.layers[0].add_files() for file in self.image.layers[0].files: self.assertTrue( diff --git a/tests/test_class_oci_image.py b/tests/test_class_oci_image.py new file mode 100644 index 00000000..3afa67bf --- /dev/null +++ b/tests/test_class_oci_image.py @@ -0,0 +1,106 @@ +# -*- coding: utf-8 -*- +# +# Copyright (c) 2021 VMware, Inc. All Rights Reserved. +# SPDX-License-Identifier: BSD-2-Clause + +import unittest + +from tern.load import skopeo +from tern.classes.oci_image import OCIImage +from tern.utils import rootfs +from test_fixtures import create_working_dir +from test_fixtures import remove_working_dir + + +class TestClassOCIImage(unittest.TestCase): + + def setUp(self): + '''Using a specific image here. If this test fails due to the image + not being found anymore, pick a different image to test against + For now use Docker to pull the image from Dockerhub''' + create_working_dir() + rootfs.set_working_dir() + # this should check if the docker image extraction is successful + skopeo.pull_image('vmware/tern@sha256:20b32a9a20752aa1ad7582c667704f' + 'da9f004cc4bfd8601fac7f2656c7567bb4') + self.image = OCIImage('vmware/tern@sha256:20b32a9a20752aa1ad7582c6' + '67704fda9f004cc4bfd8601fac7f2656c7567bb4') + # constants for this image + self.layer = ('c1c3a87012e7ff5791b31e94515b661' + 'cdf06f6d5dc2f9a6245eda8774d257a13') + self.no_layers = 1 + self.created_by = ('/bin/sh -c #(nop) ADD ' + 'file:92137e724f46c720d8083a11290c67' + 'd9daa387e523336b1757a0e3c4f5867cd5 ' + 'in / ') + self.file_info = [ + ('file2.txt', 'documents/test/file2.txt', + '9710f003d924890c7677b4dd91fd753f6ed71cc57d4f' + '9482261b6786d81957fa', + 'sha256'), + ('file2.txt', 'documents/test/test2/file2.txt', + '885000512dee8ac814641bbf6a7c887012ec23a2fb3e' + '3b2cff583c45a611317d', + 'sha256'), + ('file1.txt', 'documents/test/test2/file1.txt', + '885000512dee8ac814641bbf6a7c887012ec' + '23a2fb3e3b2cff583c45a611317d', + 'sha256'), + ('file1.txt', 'documents/test/file1.txt', + 'a3cccbc52486d50a86ff0bc1e6ea0e0b701ac' + '4bb139f8713fa136ef9ec68e97e', + 'sha256') + ] + + def tearDown(self): + del self.image + remove_working_dir() + + def testInstance(self): + self.assertEqual(self.image.repotag, 'vmware/tern@sha256:20b32a9a2' + '0752aa1ad7582c667704fda9f004cc4' + 'bfd8601fac7f2656c7567bb4') + self.assertEqual(self.image.name, 'vmware/tern') + self.assertEqual(self.image.tag, '') + self.assertTrue(self.image.checksum_type, 'sha256') + self.assertTrue(self.image.checksum, '20b32a9a20752aa1ad7582c66' + '7704fda9f004cc4bfd8601fac7' + 'f2656c7567bb4') + self.assertFalse(self.image.manifest) + self.assertFalse(self.image.config) + self.assertFalse(self.image.layers) + self.assertFalse(self.image.history) + # test instantiating with a tag + o = OCIImage('vmware/tern:testimage') + self.assertEqual(o.name, 'vmware/tern') + self.assertEqual(o.tag, 'testimage') + self.assertFalse(o.checksum_type) + self.assertFalse(o.checksum) + + def testLoadImage(self): + self.image.load_image() + self.assertEqual(self.image.layers[0].diff_id, self.layer) + self.assertEqual(len(self.image.layers), self.no_layers) + self.assertEqual(self.image.layers[0].created_by, self.created_by) + self.assertEqual(self.image.layers[0].checksum_type, 'sha256') + self.assertEqual(self.image.layers[0].checksum, self.layer) + + def testGetLayerDiffIds(self): + self.image.load_image() + self.assertEqual(len(self.image.get_layer_diff_ids()), self.no_layers) + self.assertEqual(self.image.get_layer_diff_ids()[0], self.layer) + + def testLayerFiles(self): + self.image.load_image() + self.assertFalse(self.image.layers[0].files) + self.image.layers[0].add_files() + for file in self.image.layers[0].files: + self.assertTrue( + (file.name, file.path, file.checksum, + file.checksum_type) in + self.file_info + ) + + +if __name__ == '__main__': + unittest.main()